ITABLES¶

In [1]:
import time

start_time = time.time()  # Record the start time

# to enable itables
from itables import init_notebook_mode

init_notebook_mode(all_interactive=True)

DELTA SPARK CONFIGURATION¶

In [2]:
import os
import shutil

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession


def ensure_directories_exist(warehouse_dir, metastore_db_path):
    """
    Ensures the necessary directories for the warehouse and metastore exist.

    Parameters:
        warehouse_dir (str): Path to the warehouse directory (Spark catalog).
        metastore_db_path (str): Path to the metastore database.
    """
    os.makedirs(warehouse_dir, exist_ok=True)
    os.makedirs(os.path.dirname(metastore_db_path), exist_ok=True)

def create_spark_session(
    app_name="DeltaCatalog",
    warehouse_dir="./warehouse-spark/spark_catalog",
    metastore_db_path="./warehouse-spark/metastore_db",
):
    """
    Creates and initializes a SparkSession with Delta Lake support and persistent metastore.

    Parameters:
        app_name (str): Name of the Spark application.
        warehouse_dir (str): Path to the Spark catalog warehouse directory.
        metastore_db_path (str): Path to the persistent metastore database (Derby).

    Returns:
        SparkSession: Configured SparkSession instance.
    """
    # Ensure required directories exist
    ensure_directories_exist(warehouse_dir, metastore_db_path)

    # Configure SparkSession with Delta Lake
    builder = (
        SparkSession.builder.appName(app_name)
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        .config("spark.sql.parquet.compression.codec", "gzip")
        .config("spark.databricks.delta.optimizeWrite.enabled", "true")
        .config("spark.databricks.delta.autoCompact.enabled", "true")
        .config("spark.sql.warehouse.dir", os.path.abspath(warehouse_dir))
        .config(
            "javax.jdo.option.ConnectionURL",
            f"jdbc:derby:{os.path.abspath(metastore_db_path)};create=true",
        )
    )  # .config("spark.sql.catalogImplementation", "hive") \
    # .enableHiveSupport()

    # Initialize Spark with Delta
    spark = configure_spark_with_delta_pip(builder).getOrCreate()

    print(
        f"SparkSession created with Delta and persistent metastore at: {warehouse_dir}"
    )
    return spark

CREATE DATABASE AND DELTA TABLES¶

In [3]:
def create_database(spark_session, database_name):
    """
    Creates a database if it does not already exist.

    Parameters:
        spark_session (SparkSession): The active SparkSession.
        database_name (str): The name of the database to be created.
    """
    spark_session.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    print(f"Database '{database_name}' created or already exists.")


def list_all_databases_and_tables(spark_session):
    """
    Lists all databases and their tables in Spark.

    Parameters:
        spark_session (SparkSession): The active SparkSession.

    Returns:
        dict: A dictionary where keys are database names and values are lists of table names in each database.
    """
    # List all databases
    databases = spark_session.catalog.listDatabases()

    # Create a dictionary to store database names and their corresponding table lists
    database_tables = {}

    print("The following databases and tables are present in the Spark Catalog.")
    print()

    for database in databases:
        # Set the current database to the specified database
        spark_session.sql(f"USE {database.name}")

        # List all tables in the database
        tables = spark_session.catalog.listTables(database.name)

        # Extract table names from the list of table objects
        table_names = [table.name for table in tables]

        # Store the tables for the current database
        database_tables[database.name] = table_names

        # Print the database and tables
        for table in table_names:
            print(f"Database: {database.name}, Table: {table}")
            print()

    return database_tables

CREATE SPARK DATAFRAME¶

In [4]:
def create_dataframe_from_list_dict_using_alphabetical_order_from_columns(
    spark_session, list_data_dict
):
    """
    Creates a Spark DataFrame from a list of dictionaries, reordering columns in alphabetical order.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        list_data_dict (list): A list of dictionaries, where each dictionary represents a row.

    Returns:
        DataFrame: The created Spark DataFrame with columns in alphabetical order.
    """
    if list_data_dict:
        df = spark_session.createDataFrame(list_data_dict)
        return df
    else:
        raise ValueError("The input list is empty.")


def create_dataframe_from_list_dict(spark_session, list_data_dict):
    """
    Creates a Spark DataFrame from a list of dictionaries, preserving the order of the keys.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        list_data_dict (list): A list of dictionaries, where each dictionary represents a row.

    Returns:
        DataFrame: The created Spark DataFrame with columns in the order of the keys.
    """
    if not list_data_dict:
        raise ValueError("The input list is empty.")

    # Get the order of keys from the first dictionary
    columns_order = list(list_data_dict[0].keys())

    # Create the DataFrame and reorder columns
    df = spark_session.createDataFrame(list_data_dict)
    df = df.select(*columns_order)  # Reorder columns explicitly

    return df


def split_spark_dataframe(spark_dataframe, num_parts):
    """
    Splits a Spark DataFrame into the specified number of parts, ensuring each part has at least one row.
    If the requested number of parts exceeds the total rows, it creates as many balanced parts as possible.

    Parameters:
        spark_dataframe (DataFrame): The Spark DataFrame to be split.
        num_parts (int): The desired number of parts to split the DataFrame into.

    Returns:
        List[DataFrame]: A list containing the split DataFrames.
    """
    total_rows = spark_dataframe.count()

    if total_rows == 0:
        print("The DataFrame is empty. No parts created.")
        return []

    # Adjust number of parts if more parts are requested than rows
    actual_parts = min(num_parts, total_rows)

    # Calculate base rows per part and distribute remaining rows
    rows_per_part = total_rows // actual_parts
    extra_rows = total_rows % actual_parts

    split_dataframes = []
    start_row = 0
    
    for i in range(actual_parts):
        # Calculate rows for the current part
        rows_in_this_part = rows_per_part + (1 if i < extra_rows else 0)
        end_row = start_row + rows_in_this_part
        
        # Select the rows for the current part using offset and limit
        split_dataframes.append(
            spark_dataframe.offset(start_row).limit(rows_in_this_part)
        )
        
        start_row = end_row  # Update start row for the next part

    print(f"Successfully created {len(split_dataframes)} DataFrames.")
    return split_dataframes

CREATE DELTA TABLES¶

In [5]:
import os
import shutil

def create_delta_table_with_spark_dataframe_and_register(
    spark_session, database_name, table_name, spark_dataframe, warehouse_dir, partition_by=None
):
    """
    Creates a Delta table in the specified database and registers it with SQL.

    Parameters:
        spark_session (SparkSession): The active SparkSession.
        database_name (str): The name of the database where the table will be created.
        table_name (str): The name of the table to be created.
        spark_dataframe (DataFrame): The Spark DataFrame whose data will be used to create the table.
        warehouse_dir (str): The root directory for the warehouse.
        partition_by (list, optional): List of column names to partition the table by (default is None).
    """
    # Ensure the database exists or create it
    available_databases = [db.name for db in spark_session.catalog.listDatabases()]
    if database_name not in available_databases:
        spark_session.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

    # Set the active database
    spark_session.sql(f"USE {database_name}")

    # Define the table path based on the database and table name
    table_path = f"{warehouse_dir}/{database_name}/{table_name}"

    write_options = spark_dataframe.write.format("delta").mode("overwrite")

    # Apply partitioning if specified
    if partition_by:
        write_options.partitionBy(*partition_by)

    # Save the DataFrame as a Delta table in the location
    write_options.save(table_path)

    # Register the Delta table in the Spark catalog using SQL
    spark_session.sql(
        f"""
        CREATE OR REPLACE TABLE {table_name}
        USING DELTA
        AS
        SELECT * FROM parquet.`{table_path}`;
    """
    )

    # Delete the files generated in the table path
    if os.path.exists(table_path):
        shutil.rmtree(table_path)
        print(f"Temporary files at '{table_path}' have been deleted.")
    else:
        print(f"No temporary files found at '{table_path}'.")

    print(
        f"Table '{table_name}' created and registered at '{table_path}' in database '{database_name}' with partitioning by {partition_by if partition_by else 'None'}."
    )
    
def save_dataframe_as_parquet(spark_dataframe, file_name, file_path, partition_by=None):
    """
    Saves a Spark DataFrame to the specified path in Parquet format.

    Parameters:
        spark_dataframe (DataFrame): The Spark DataFrame to be saved.
        file_name (str): The name of the file (or dataset) to be created.
        file_path (str): The location where the file will be stored.
        partition_by (list, optional): List of column names to partition the data by (default is None).
    """
    write_options = spark_dataframe.write.format("parquet").mode("overwrite")

    if partition_by:
        write_options.partitionBy(*partition_by)
        
    write_options.save(file_path)
    print(f"DataFrame saved as Delta Parquet at '{file_path}/{file_name}' with partitioning by {partition_by if partition_by else 'None'}.")

def save_dataframe_as_delta_parquet(spark_dataframe, file_name, file_path, partition_by=None):
    """
    Saves a Spark DataFrame to the specified path in Delta Parquet format.

    Parameters:
        spark_dataframe (DataFrame): The Spark DataFrame to be saved.
        file_name (str): The name of the file (or dataset) to be created.
        file_path (str): The location where the file will be stored.
        partition_by (list, optional): List of column names to partition the data by (default is None).
    """
    write_options = spark_dataframe.write.format("delta").mode("overwrite")

    # Apply partitioning if partition_by is provided
    if partition_by:
        write_options.partitionBy(*partition_by)

    # Save the DataFrame as a Delta Parquet file at the specified location
    write_options.save(f"{file_path}/{file_name}")

    print(f"DataFrame saved as Delta Parquet at '{file_path}/{file_name}' with partitioning by {partition_by if partition_by else 'None'}.")


def create_delta_table_in_database(spark_session, database_name, table_name, spark_dataframe, warehouse_dir, partition_by=None):
    """
    Creates a Delta table in a Spark database, ensuring that any existing table or directory is deleted before creating
    the new table. The table is created with the delta.enableChangeDataFeed property enabled.

    Parameters:
    spark_session (SparkSession): The active Spark session to run SQL queries.
    database_name (str): The name of the database where the table will be stored.
    table_name (str): The name of the table to be created.
    spark_dataframe (DataFrame): The DataFrame to be saved as a Delta table.
    warehouse_dir (str): The base directory where Delta tables are stored.
    partition_by (list, optional): List of columns to partition the table by. If not provided, the table will not be partitioned.

    Exceptions:
    - If the specified partition columns do not exist in the DataFrame, a ValueError will be raised.
    """
    
    # Define the path for the Delta table
    table_path = f"{warehouse_dir}/{database_name}/{table_name}"
    full_table_name = f"{database_name}.{table_name}"

    # Remove the table directory if it exists (manually)
    if os.path.exists(table_path):
        shutil.rmtree(table_path)  # Remove the directory and its contents
        print(f"Directory for table '{full_table_name}' deleted.")

    # Drop the table from the database if it exists (to avoid conflicts)
    try:
        spark_session.sql(f"DROP TABLE IF EXISTS {full_table_name}")
        print(f"Table '{full_table_name}' dropped.")
    except Exception as e:
        print(f"Could not drop the table: {str(e)}")

    # Write the new Delta table with overwrite mode
    write_options = spark_dataframe.write.format("delta").mode("overwrite")

    # Check if partitioning is specified and verify columns exist in the DataFrame
    if partition_by:
        missing_columns = [col for col in partition_by if col not in spark_dataframe.columns]
        if missing_columns:
            raise ValueError(f"The following partition columns do not exist in the DataFrame: {missing_columns}")
        write_options = write_options.partitionBy(*partition_by)

    # Save the Delta table in the database
    write_options.saveAsTable(full_table_name)

    # Enable Change Data Feed on the Delta table
    try:
        spark_session.sql(f"ALTER TABLE {full_table_name} SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')")
        print(f"Property 'delta.enableChangeDataFeed' enabled for table '{full_table_name}'.")
    except Exception as e:
        print(f"Error enabling 'delta.enableChangeDataFeed': {str(e)}")

    print(f"Table '{table_name}' created in database '{database_name}' at '{table_path}' with partitioning by {partition_by if partition_by else 'none'}.")

def read_parquet_or_delta_file(spark_session, directory_path):
    """
    Reads a Parquet or Delta file from the specified directory path.

    This function attempts to read a Parquet file from the provided path using the Spark session. 
    If the operation fails (e.g., the file does not exist or the format is incompatible), 
    an error message is displayed, and None is returned.

    Parameters:
        spark_session (SparkSession): The active Spark session used for reading the file.
        directory_path (str): The path to the directory containing the Parquet or Delta file.

    Returns:
        DataFrame: The DataFrame containing the file's data if successfully read.
        None: If an error occurs during the file reading process.

    Example:
        # Create a Spark session
        spark = SparkSession.builder.appName("ReadExample").getOrCreate()

        # Read a Parquet or Delta file
        df = read_parquet_or_delta_file(spark, "/path/to/directory")

        if df is not None:
            df.show()
        else:
            print("Failed to read the file.")
    """
    try:
        df = spark_session.read.parquet(directory_path)
        print(f"Successfully read Parquet file from '{directory_path}'.")
        return df
    except Exception as e:
        print(f"Error reading file from '{directory_path}': {e}")
        return None
        
def execute_spark_sql_query(
    spark_session: SparkSession, query: str
) -> "pyspark.sql.dataframe.DataFrame":
    """
    Execute a SQL query and return the results as a DataFrame.

    :param spark_session: The SparkSession object.
    :param query: The SQL query to execute.
    :return: A PySpark DataFrame containing the query results.
    """
    return spark_session.sql(query)

CRUD SPARK DATAFRAMES¶

In [6]:
from pyspark.sql.functions import expr, col
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import pandas as pd
from pandas import Timestamp

def is_delta_table(spark_session: SparkSession, delta_table_path: str) -> bool:
    """
    Checks if a given path corresponds to a Delta table.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The file path to check.

    Returns:
        bool: True if the path is a Delta table, False otherwise.
    """
    return (
        DeltaTable.forPath(spark_session, delta_table_path)
        if DeltaTable.isDeltaTable(spark_session, delta_table_path)
        else False
    )


def restore_delta_lake_to_version(
    spark_session: SparkSession, delta_table_path: str, version: int = None, timestamp: str = None
):
    """
    Restores a Delta table to a specific version or timestamp.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        version (int, optional): The version to restore to.
        timestamp (str, optional): The timestamp to restore to in ISO format.

    Returns:
        pandas.DataFrame: The updated Delta table details or history.
    """
    try:
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            delta_table.optimize().executeCompaction()
            history_df = delta_table.history().toPandas()
            available_versions = history_df["version"].tolist()
            available_timestamps = history_df["timestamp"].tolist()

            if version is not None:
                if version not in available_versions:
                    print(f"Error: Version {version} does not exist.")
                    return history_df
                delta_table.restoreToVersion(version)
                print(f"Restored to version {version}.")
                return delta_table.detail().toPandas()

            if timestamp is not None:
                timestamp_obj = Timestamp(timestamp)
                if timestamp_obj not in available_timestamps:
                    print(f"Error: Timestamp {timestamp} does not exist.")
                    return history_df
                delta_table.restoreToTimestamp(timestamp)
                print(f"Restored to timestamp {timestamp}.")
                return delta_table.detail().toPandas()

            return history_df
        else:
            print(f"{delta_table_path} does not contain a Delta table.")
    except Exception as e:
        print(f"Error restoring Delta table: {str(e)}")

def write_into_delta_lake(
    spark_session: SparkSession, delta_table_path: str, spark_dataframe
):
    """
    Writes data into a Delta table, avoiding duplicates by comparing existing columns 
    and adding new columns if necessary.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        spark_dataframe (DataFrame): The Spark DataFrame to write.

    Returns:
        None
    """
    try:
        # Check if the path corresponds to a Delta table
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            # Optimize the Delta table before processing
            delta_table.optimize().executeCompaction()

            # Load the existing data from the Delta table
            existing_data = spark_session.read.format("delta").load(delta_table_path)

            # Identify common columns between the existing data and the new data
            common_columns = list(set(existing_data.columns).intersection(set(spark_dataframe.columns)))

            # Join the existing data with the new data on the common columns
            # We will use 'left_anti' to find new records based on differences
            new_rows = spark_dataframe.join(
                existing_data, on=common_columns, how="left_anti"
            )

            # If there are new rows, append them to the table
            if not new_rows.isEmpty():
                # Write the new rows (with new columns if present)
                new_rows.write.option("mergeSchema", "true").mode("append").format("delta").save(delta_table_path)
                print("Added new data without duplicates.")
            else:
                print("No new rows to append.")
        else:
            # If the Delta table does not exist, create a new table
            print(f"{delta_table_path} does not contain a Delta table.")
            spark_dataframe.write.format("delta").mode("overwrite").save(delta_table_path)
            print("Created Delta table with new data.")
    except Exception as e:
        print(f"Error writing to Delta table: {str(e)}")

def delete_from_delta_lake(
    spark_session: SparkSession, delta_table_path: str, condition: str
):
    """
    Deletes rows from a Delta table based on a condition.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        condition (str): The condition for deletion.

    Returns:
        pandas.DataFrame: Updated Delta table details.
    """
    try:
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            delta_table.optimize().executeCompaction()
            preview_df = delta_table.toDF().filter(condition).limit(1).collect()

            if not preview_df:
                print(f"No records match the condition '{condition}'.")
                return delta_table.toDF().filter(condition).toPandas()

            delta_table.delete(condition)
            print(f"Deleted records with condition '{condition}'.")
            return delta_table.detail().toPandas()
        else:
            print(f"{delta_table_path} does not contain a Delta table.")
    except Exception as e:
        print(f"Error deleting from Delta table: {str(e)}")


def update_from_delta_lake(
    spark_session: SparkSession, delta_table_path: str, condition: str, set_expression: dict
):
    """
    Updates rows in a Delta table based on a condition.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        condition (str): The condition for updating rows.
        set_expression (str): The update expression (e.g., "column = value").

    Returns:
        pandas.DataFrame: Updated Delta table details.
    """
    try:
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            delta_table.optimize().executeCompaction()
            preview_df = delta_table.toDF().filter(condition).limit(1).collect()

            if not preview_df:
                print(f"No records match the condition '{condition}'.")
                return delta_table.toDF().filter(condition).toPandas()

            delta_table.update(condition=expr(condition), set=set_expression)
            print(f"Updated records with condition '{condition}'.")
            return delta_table.detail().toPandas()
        else:
            print(f"{delta_table_path} does not contain a Delta table.")
    except Exception as e:
        print(f"Error updating Delta table: {str(e)}")

def merge_from_delta_lake(
    spark_session: SparkSession, delta_table_path: str, sync_data_df, identifier_column: str
):
    """
    Merges synchronized data into a Delta table.

    This function performs a merge operation to update existing records or insert new ones
    from the source DataFrame into the target Delta table. The merge is based on a specified
    unique identifier column.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The file path to the Delta table.
        sync_data_df (DataFrame): The source DataFrame containing the synchronized data.
        identifier_column (str): The column used as the unique identifier for the merge.

    Returns:
        None

    Example:
        >>> merge_from_delta_lake(spark_session, "/path/to/delta", sync_data_df, "id")
        Merge completed.
    """
    try:
        # Check if the path corresponds to a Delta table
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            # Optimize the Delta table before the merge
            delta_table.optimize().executeCompaction()

            # Perform the merge operation
            delta_table.alias("sync").merge(
                sync_data_df.alias("source"),
                f"sync.{identifier_column} = source.{identifier_column}",
            ).whenMatchedUpdate(
                set={col_name: col(f"source.{col_name}") for col_name in sync_data_df.columns}
            ).whenNotMatchedInsert(
                values={col_name: col(f"source.{col_name}") for col_name in sync_data_df.columns}
            ).execute()

            print("Merge operation completed successfully.")
        else:
            # Create the Delta table if it does not exist
            print(f"The directory '{delta_table_path}' does not contain a Delta table.")
            sync_data_df.write.format("delta").mode("overwrite").save(delta_table_path)
            print(f"Delta table created at '{delta_table_path}' with synchronized data.")

    except Exception as e:
        print(f"Error during the merge operation on the Delta table '{delta_table_path}': {str(e)}")

def show_historic_version_from_delta_file(
    spark_session, file_path, version=None, operation_filter=None, sort_by=None
):
    """
    Show historic changes from a Delta table version, handling column mismatches.
    """
    try:
        from pyspark.sql.functions import lit

        # Retrieve Delta Table and its history
        delta_table = DeltaTable.forPath(spark_session, file_path)
        history_df = delta_table.history()

        if operation_filter:
            # Filter history based on operation type
            result_df = history_df.filter(
                history_df.operation.contains(operation_filter)
            ).toPandas()
        elif version is not None:
            if version < 0 or version >= history_df.count():
                result_df = history_df.toPandas()
            else:
                # Get metadata for the requested version
                history_row = history_df.filter(history_df.version == version).collect()[0]
                operation, timestamp, user = (
                    history_row.operation,
                    history_row.timestamp,
                    history_row.userName or "Unknown",
                )

                # Load current and previous versions of the Delta table
                df_current = (
                    spark_session.read.format("delta")
                    .option("versionAsOf", version)
                    .load(file_path)
                )
                df_previous = (
                    spark_session.read.format("delta")
                    .option("versionAsOf", version - 1)
                    .load(file_path)
                    if version > 0
                    else None
                )

                # Align columns
                if df_previous:
                    # Get all unique columns from both DataFrames
                    current_columns = set(df_current.columns)
                    previous_columns = set(df_previous.columns)

                    # Add missing columns to each DataFrame with null values
                    for col in previous_columns - current_columns:
                        df_current = df_current.withColumn(col, lit(None))
                    for col in current_columns - previous_columns:
                        df_previous = df_previous.withColumn(col, lit(None))

                    # Reorder columns to ensure the same order in both DataFrames
                    common_columns = sorted(list(current_columns | previous_columns))
                    df_current = df_current.select(common_columns)
                    df_previous = df_previous.select(common_columns)

                # Handle operations
                if operation == "UPDATE":
                    df_removed = (
                        df_previous.subtract(df_current).toPandas()
                        if df_previous
                        else None
                    )
                    df_added = (
                        df_current.subtract(df_previous).toPandas()
                        if df_previous
                        else df_current.toPandas()
                    )

                    if df_removed is not None:
                        df_removed["ChangeType"] = "PRE UPDATE"
                    if df_added is not None:
                        df_added["ChangeType"] = "UPDATE"

                    result_df = pd.concat([df_removed, df_added]).reset_index(drop=True)
                elif operation == "DELETE" and df_previous:
                    result_df = (
                        df_previous.subtract(df_current)
                        .toPandas()
                        .assign(ChangeType="Deleted")
                    )
                else:
                    result_df = (
                        df_current.subtract(df_previous).toPandas()
                        if df_previous
                        else df_current.toPandas()
                    )

            # Sort results if requested
            if sort_by and sort_by in result_df.columns:
                result_df = result_df.sort_values(by=sort_by)
            else:
                print(f"Warning: Column '{sort_by}' not found in the DataFrame.")

        else:
            # Return full history if no version is specified
            result_df = history_df.toPandas()

        return result_df

    except Exception as e:
        print(f"Error: {str(e)}. Could not retrieve version or history from the Delta table.")
        return None

def read_delta_table_with_change_data_control(spark_session, delta_table_path, starting_version=0, ending_version=0):
    """
    Reads data from a Delta table with change data capture (CDC) enabled.

    This function loads data from a Delta table and retrieves the changes between specified versions.
    If the provided versions are invalid or if `starting_version` is greater than `ending_version`,
    the function adjusts the versions to ensure the correct range is used.

    Args:
        spark_session (SparkSession): The Spark session used to load the Delta table.
        delta_table_path (str): The path to the Delta table.
        starting_version (int, optional): The starting version for change data retrieval. Defaults to 0.
        ending_version (int, optional): The ending version for change data retrieval. Defaults to 0.

    Returns:
        pandas.DataFrame: A Pandas DataFrame containing the data from the Delta table.
    """
    
    try:
        # Initialize DeltaTable object
        delta_table = DeltaTable.forPath(spark_session, delta_table_path)
        
        # Get the Delta table history to check version range
        history_df = delta_table.history()
        max_version = history_df.select("version").rdd.max()[0]

        # Adjust if starting_version is greater than ending_version
        if starting_version > ending_version:
            starting_version, ending_version = ending_version, starting_version

        # Check if the provided versions are valid
        if 0 <= starting_version <= max_version and 0 <= ending_version <= max_version:
            return (
                spark_session.read
                .format("delta")
                .option("readChangeData", "true")
                .option("startingVersion", starting_version)
                .option("endingVersion", ending_version)
                .load(delta_table_path)
                .toPandas()
            )
        else:
            # Default behavior when versions are out of range
            return (
                spark_session.read
                .format("delta")
                .option("readChangeData", "true")
                .option("startingVersion", 0)
                .load(delta_table_path)
                .toPandas()
            )
    except Exception as e:
        print(f"Error reading Delta table: {e}")
        return None

CRUD VERSION 2¶

In [7]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

def insert_into_delta_table(spark_session, database_name, table_name, spark_dataframe):
    """
    Inserts records into a Delta table.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        database_name (str): The name of the database where the Delta table exists.
        table_name (str): The name of the Delta table where data will be inserted.
        df (DataFrame): The Spark DataFrame containing the data to insert.

    Returns:
        None
    """
    # Create the full table name with the database
    full_table_name = f"{database_name}.{table_name}"

    # Insert the data into the Delta table in append mode
    spark_dataframe.write.format("delta").mode("append").saveAsTable(full_table_name)

    print(f"Records inserted into Delta table: {full_table_name}")

def delete_from_delta_table(spark_session, database_name, table_name, condition):
    """
    Deletes records from a Delta table based on a specified condition.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        database_name (str): The name of the database where the Delta table exists.
        table_name (str): The name of the Delta table where data will be deleted.
        condition (str): The SQL condition to specify which records to delete.

    Returns:
        None
    """
    from delta.tables import DeltaTable

    try:
        # Create the full table name with the database
        full_table_name = f"{database_name}.{table_name}"

        # Load the Delta table
        delta_table = DeltaTable.forName(spark_session, full_table_name)

        # Perform the delete operation
        delta_table.delete(condition)

        print(f"Records matching condition '{condition}' deleted from Delta table: {full_table_name}")

    except Exception as e:
        print(f"Error: Could not delete records from Delta table. {str(e)}")
        
def update_in_delta_table(spark_session, database_name, table_name, condition, set_dict):
    """
    Updates records in a Delta table based on a specified condition and set clause.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        database_name (str): The name of the database where the Delta table exists.
        table_name (str): The name of the Delta table where data will be updated.
        set_dict (dict): A dictionary where the keys are column names and the values are the new values for those columns.
        condition (str): The SQL condition to specify which records to update.

    Returns:
        None
    """
    try:
        # Create the full table name with the database
        full_table_name = f"{database_name}.{table_name}"

        # Load the Delta table
        delta_table = DeltaTable.forName(spark_session, full_table_name)

        # Convert integer and other literals into Spark SQL expressions using F.lit()
        set_expr = {col: F.lit(value) if isinstance(value, (int, float, str)) else value 
                    for col, value in set_dict.items()}

        # Perform the update operation
        delta_table.update(condition, set_expr)

        print(f"Records matching condition '{condition}' updated in Delta table: {full_table_name}")

    except Exception as e:
        print(f"Error: Could not update records in Delta table. {str(e)}")

FUNCTION TO CREATE JOINS IN SPARK¶

In [8]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def join_spark_dataframes(
    left_df: DataFrame,
    right_df: DataFrame,
    join_type: str,
    join_condition: str = None,
    left_column: str = None,
    right_column: str = None,
    return_pandas: bool = False,
    execution_plan: bool = False,
):
    """
    Realiza diferentes tipos de uniones entre dos DataFrames y permite capturar los nombres de las variables reales.
    """
    # Diccionario de alias para cada tipo de unión
    join_aliases = {
        "inner": ["inner", "inner_join"],
        "left": ["left", "left_join"],
        "left_anti": ["left_anti", "anti_left", "anti_left_join"],
        "right": ["right", "right_join"],
        "right_anti": ["right_anti", "anti_right", "anti_right_join"],
        "outer": ["outer", "outer_join"],
        "outer_anti": ["outer_anti", "anti_outer_join", "outer_join_anti"],
        "cross": ["cross", "cross_join"],
        "self": ["self", "self_join"],
        "except": ["except", "exception","exceptAll", "except All", "except_all", "except all"],
        "intersect": ["intersect", "intersection"],
        "union": ["union"],
        "union_all": ["union_all", "union all"],
    }

    # Mapeo único de uniones a su tipo principal
    join_operations = {
        "inner": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "inner"
        ),
        "left": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "left"
        ),
        "left_anti": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "left_anti"
        ),
        "right": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "right"
        ),
        "right_anti": lambda: right_df.join(
            left_df, right_df[right_column] == left_df[left_column], "left_anti"
        ),
        "right_anti_v2": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "right"
        )
        .filter(left_df[left_column].isNull())
        .select(*right_df.columns),
        "outer": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "outer"
        ),
        "outer_anti": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "outer"
        ).filter(
            (left_df[left_column].isNotNull() & right_df[right_column].isNull())
            | (left_df[left_column].isNull() & right_df[right_column].isNotNull())
        ),
        "cross": lambda: left_df.crossJoin(right_df),
        "self": lambda: left_df.alias("table_one").join(
            left_df.alias("table_two"),
            F.col(f"table_one.{left_column}") == F.col(f"table_two.{right_column}"),
            "inner"
        ),
        "intersect": lambda: left_df.intersect(right_df),
        "except": lambda: left_df.exceptAll(right_df),
        "union": lambda: left_df.union(right_df),
        "union_all": lambda: left_df.unionByName(right_df, allowMissingColumns=True),  # tiene que ser los mismos nombres de columnas en ambos dataframes, sino toma la de la izquierda
    }

    # Validar que el tipo de join sea válido (revisando en el mapeo de alias)
    requested_join = join_type
    join_type = next(
        (key for key, aliases in join_aliases.items() if join_type in aliases), None
    )

    if not join_type:
        raise ValueError(f"Tipo de join '{requested_join}' no es válido.")

    if join_condition and isinstance(join_condition, str):
        # Si join_condition es una cadena, convertirla a expresión de columna de PySpark
        join_condition = F.expr(join_condition)

        joined_df = left_df.join(right_df, join_condition, join_type)

    elif left_column and right_column:
        # Ejecutar la operación de unión
        joined_df = join_operations[join_type]()

    if execution_plan:
        print(f"Query ejecutado:")
        joined_df.explain(True)  # Usar .explain(True) para obtener detalles del plan de ejecución

    # Retornar el resultado en Pandas o como DataFrame de Spark
    if return_pandas:
        return joined_df.toPandas()
    else:
        return joined_df

CLAUSES FOR SPARK SQL QUERIES
¶

Operación SQL Normal (ANSI SQL) Spark SQL DataFrame API
SELECT SELECT columna FROM tabla SELECT columna FROM tabla df.select("columna")
FROM SELECT * FROM tabla SELECT * FROM tabla df
WHERE SELECT * FROM tabla WHERE condicion SELECT * FROM tabla WHERE condicion df.filter("condicion")
GROUP BY SELECT columna, COUNT(*) FROM tabla GROUP BY columna SELECT columna, COUNT(*) FROM tabla GROUP BY columna df.groupBy("columna").count()
HAVING SELECT columna, COUNT(*) FROM tabla GROUP BY columna HAVING COUNT(*) > 10 SELECT columna, COUNT(*) FROM tabla GROUP BY columna HAVING COUNT(*) > 10 df.groupBy("columna").count().filter("count > 10")
JOIN (INNER) SELECT * FROM tabla1 INNER JOIN tabla2 ON tabla1.id = tabla2.id SELECT * FROM tabla1 INNER JOIN tabla2 ON tabla1.id = tabla2.id df1.join(df2, df1.id == df2.id)
JOIN (LEFT OUTER) SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id df1.join(df2, df1.id == df2.id, "left")
JOIN (RIGHT OUTER) SELECT * FROM tabla1 RIGHT JOIN tabla2 ON tabla1.id = tabla2.id SELECT * FROM tabla1 RIGHT JOIN tabla2 ON tabla1.id = tabla2.id df1.join(df2, df1.id == df2.id, "right")
JOIN (FULL OUTER) SELECT * FROM tabla1 FULL OUTER JOIN tabla2 ON tabla1.id = tabla2.id SELECT * FROM tabla1 FULL OUTER JOIN tabla2 ON tabla1.id = tabla2.id df1.join(df2, df1.id == df2.id, "outer")
LEFT ANTI JOIN (Equivalente en ANSI SQL) SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id WHERE tabla2.id IS NULL SELECT * FROM tabla1 LEFT ANTI JOIN tabla2 ON tabla1.id = tabla2.id df1.join(df2, df1.id == df2.id, "left_anti")
UNION SELECT columna FROM tabla1 UNION SELECT columna FROM tabla2 SELECT columna FROM tabla1 UNION SELECT columna FROM tabla2 df1.union(df2)
UNION ALL SELECT columna FROM tabla1 UNION ALL SELECT columna FROM tabla2 SELECT columna FROM tabla1 UNION ALL SELECT columna FROM tabla2 df1.unionByName(df2)
INTERSECT SELECT columna FROM tabla1 INTERSECT SELECT columna FROM tabla2 SELECT columna FROM tabla1 INTERSECT SELECT columna FROM tabla2 df1.intersect(df2)
EXCEPT SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 df1.exceptAll(df2)
EXCEPT SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 df1.select("columna").subtract(df2.select("columna"))
AGGREGATE (SUM) SELECT SUM(columna) FROM tabla SELECT SUM(columna) FROM tabla df.agg({"columna": "sum"})
AGGREGATE (AVG) SELECT AVG(columna) FROM tabla SELECT AVG(columna) FROM tabla df.agg({"columna": "avg"})
AGGREGATE (MAX) SELECT MAX(columna) FROM tabla SELECT MAX(columna) FROM tabla df.agg({"columna": "max"})
AGGREGATE (MIN) SELECT MIN(columna) FROM tabla SELECT MIN(columna) FROM tabla df.agg({"columna": "min"})
AGGREGATE (COUNT) SELECT COUNT(*) FROM tabla SELECT COUNT(*) FROM tabla df.count()
DISTINCT SELECT DISTINCT columna FROM tabla SELECT DISTINCT columna FROM tabla df.select("columna").distinct()
ORDER BY SELECT * FROM tabla ORDER BY columna ASC SELECT * FROM tabla ORDER BY columna ASC df.orderBy("columna")
LIMIT SELECT * FROM tabla LIMIT 10 SELECT * FROM tabla LIMIT 10 df.limit(10)
OFFSET SELECT columna FROM tabla LIMIT 10 OFFSET 5 SELECT columna FROM tabla LIMIT 10 OFFSET 5 df.offset(5).limit(10)
CASE WHEN (IF ELSE) SELECT CASE WHEN columna > 10 THEN 'Alto' ELSE 'Bajo' END FROM tabla SELECT CASE WHEN columna > 10 THEN 'Alto' ELSE 'Bajo' END FROM tabla df.withColumn("nuevo_columna", when(df["columna"] > 10, "Alto").otherwise("Bajo"))
IS NULL SELECT * FROM tabla WHERE columna IS NULL SELECT * FROM tabla WHERE columna IS NULL df.filter(df["columna"].isNull())
IS NOT NULL SELECT * FROM tabla WHERE columna IS NOT NULL SELECT * FROM tabla WHERE columna IS NOT NULL df.filter(df["columna"].isNotNull())
CAST SELECT CAST(columna AS INT) FROM tabla SELECT CAST(columna AS INT) FROM tabla df.withColumn("columna", df["columna"].cast("int"))

DOWNLOAD DATASET Y CREATE PANDAS DATAFRAME¶

In [9]:
import pandas as pd
from pandas_dataset_handler import PandasDatasetHandler
import gc

# URL del archivo Parquet en GitHub
parquet_file = "https://raw.githubusercontent.com/JorgeCardona/data-collection-json-csv-sql/main/parquet/bulk_data_20.parquet"
pandas_df = PandasDatasetHandler.load_dataset(parquet_file)

parquet_file_name = "dataset.parquet"
pandas_df.to_parquet(parquet_file_name, index=False)

print(pandas_df.shape)
pandas_df.head()

# Liberar memoria eliminando el DataFrame
del pandas_df
gc.collect()  # Forzar recolección de basura

print("Pandas DataFrame eliminado de la memoria.")
File 'https://raw.githubusercontent.com/JorgeCardona/data-collection-json-csv-sql/main/parquet/bulk_data_20.parquet' successfully loaded as parquet.
(250000, 19)
Pandas DataFrame eliminado de la memoria.

SPARK SESSION
¶

In [10]:
# Example usage
app_name = "Delta Spark"
# Ruta para el directorio del metastore
base_dir = "./warehouse-spark"
warehouse_dir = f"{base_dir}/spark_catalog/database"
metastore_db_path = f"{base_dir}/metastore/metastore_db"

spark_session = create_spark_session(
    app_name=app_name, warehouse_dir=warehouse_dir, metastore_db_path=metastore_db_path
)

# Set this configuration before running your queries
spark_session.conf.set(
    "spark.sql.debug.maxToStringFields", "1000"
)  # Set this to a higher number

# Ajusta el número de particiones dinámicamente en base al número de núcleos disponibles
num_particiones = max(2, spark_session.sparkContext.defaultParallelism)  # Al menos 2 particiones, ajustado a los núcleos
spark_session.conf.set("spark.sql.shuffle.partitions", num_particiones)  # Ajusta el número de particiones según el entorno
spark_session.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") # enable schema evolution
spark_session.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")#config to enable all new Delta tables with Change Data Feed
Warning: Ignoring non-Spark config property: javax.jdo.option.ConnectionURL
:: loading settings :: url = jar:file:/usr/local/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-be919257-68dd-426d-b191-ae5f79a813ce;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.1 in central
	found io.delta#delta-storage;3.2.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 149ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.1 from central in [default]
	io.delta#delta-storage;3.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-be919257-68dd-426d-b191-ae5f79a813ce
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/6ms)
24/12/27 20:15:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SparkSession created with Delta and persistent metastore at: ./warehouse-spark/spark_catalog/database

SPARK USER INTERFACE¶

CLICK HERE

CREATE SPARK DATAFRAME FROM A CLASSIC PARQUET FILE
¶

In [11]:
spark_dataframe_sample = spark_session.read.parquet(parquet_file_name)

# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_sample.cache()

spark_dataframe_sample.show(truncate=False)
                                                                                
+-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------------------+--------------+--------------+--------------+-------------------------+------------+
|email                        |address                                                 |country                  |sex   |age|profession     |zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language|hobby            |favorite_tv_show        |favorite_color|favorite_drink|favorite_music|favorite_technology      |favorite_car|
+-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------------------+--------------+--------------+--------------+-------------------------+------------+
|kathryn69@example.org        |060 Steven Row Apt. 380\nTimothystad, WI 93595          |Guam                     |Male  |38 |Pilot          |Aquarius   |Sushi        |Parkour       |Superhero           |Snake          |Serbian           |Cycling          |Ozark                   |Sapphire      |Soda          |Indie         |Containers               |Lamborghini |
|mallory01@example.org        |65236 Mcclure Avenue Apt. 710\nBrendaside, CA 81540     |Zambia                   |Male  |26 |Journalist     |Taurus     |Cheesecake   |Lacrosse      |Epic                |Seal           |Dutch             |Fishing          |Breaking Bad            |Chocolate     |Smoothie      |R&B           |Data Warehousing         |Audi        |
|tristankent@example.com      |966 Deborah Drive Suite 791\nNorth Mary, AL 20500       |Korea                    |Female|69 |Mechanic       |Aquarius   |Burrito      |Golf          |Romance             |Fox            |Romanian          |Origami          |The Boys                |Scarlet       |Coffee        |Experimental  |DevOps                   |Hyundai     |
|russelljohn@example.com      |745 Fitzgerald Parkway Apt. 122\nSheafort, MH 96479     |Montserrat               |Male  |79 |Translator     |Pisces     |Lamb Chops   |Figure Skating|Mockumentary        |Frog           |Lithuanian        |Reading          |Arrow                   |Teal          |Chai          |Blues         |Blockchain               |BMW         |
|scarlson@example.com         |285 Scott Extensions Suite 651\nSouth Ronald, AS 17445  |Greece                   |Male  |71 |Social Worker  |Aries      |Shawarma     |Swimming      |Experimental        |Jellyfish      |Portuguese        |Learning         |Fargo                   |Maroon        |Matcha        |Opera         |Green Computing          |Lamborghini |
|middletonmargaret@example.net|3256 Jennifer Passage\nWest Denisemouth, NE 15122       |Gambia                   |Male  |88 |Artist         |Cancer     |Fajitas      |Taekwondo     |Disaster            |Antelope       |Arabic            |Fitness          |The Mandalorian         |Indigo        |Kombucha      |Zouk          |Nanotechnology           |Land Rover  |
|twallace@example.net         |903 Nicholas Cove Apt. 156\nSouth Alexland, ND 50885    |Jordan                   |Male  |27 |Architect      |Gemini     |Hot Dog      |Soccer        |Parody              |Chimpanzee     |Welsh             |Astronomy        |House of Cards          |Peach         |Americano     |Latin         |Containers               |Aston Martin|
|martinmark@example.com       |12268 Daniel Haven Suite 128\nThomasside, NM 81208      |South Africa             |Male  |55 |Social Worker  |Sagittarius|Ice Cream    |Rowing        |Silent Film         |Octopus        |Swahili           |Stargazing       |Succession              |Chocolate     |Lemonade      |House         |Containers               |Nissan      |
|jfreeman@example.org         |704 Randall Plains Suite 752\nNorth Donaldfurt, SC 99060|Germany                  |Female|79 |Teacher        |Scorpio    |Lasagna      |Handball      |Musical             |Bear           |Spanish           |Dancing          |The Flash               |Brown         |Black Tea     |Lo-fi         |Edge AI                  |Buick       |
|whitekristen@example.org     |46919 Joseph Stravenue\nNorth Michael, IN 92016         |Norfolk Island           |Male  |65 |Electrician    |Aries      |Chicken Wings|Rugby         |Period Piece        |Sheep          |Portuguese        |Origami          |CSI                     |Chartreuse    |Soda          |Classical     |Autonomous Vehicles      |Kia         |
|nicolerose@example.org       |8419 Howard Shoals Apt. 078\nOwensshire, OK 96297       |Italy                    |Female|48 |NULL           |Virgo      |Eclairs      |Skiing        |Independent         |Fox            |Estonian          |Playing Cards    |Buffy the Vampire Slayer|Yellow        |Soda          |Cumbia        |Augmented Reality        |Mitsubishi  |
|mary11@example.com           |8519 Brian Glen\nNorth Dawn, PR 77205                   |Canada                   |Female|95 |Bartender      |Taurus     |Pizza        |Bowling       |Disaster            |Penguin        |Arabic            |Gaming           |Westworld               |Orange        |Green Tea     |Bolero        |Data Lakes               |Mazda       |
|davidfernandez@example.net   |49951 Robert Lock\nNew Amy, IA 33498                    |Paraguay                 |Male  |23 |Entrepreneur   |Sagittarius|Empanadas    |Shooting      |Comedy              |Chimpanzee     |Indonesian        |Language Learning|Vikings                 |Bronze        |Tonic Water   |Electronic    |Data Lakes               |SEAT        |
|sharon37@example.org         |PSC 8736, Box 3940\nAPO AE 31112                        |Saint Pierre and Miquelon|Male  |32 |Designer       |Capricorn  |Dumplings    |Weightlifting |Romantic Comedy     |Buffalo        |Welsh             |Language Learning|Lucifer                 |Yellow        |Sangria       |Metal         |Brain-Computer Interfaces|Audi        |
|brandon67@example.com        |25491 Fritz Club\nMoonbury, ID 23063                    |Grenada                  |Female|47 |Content Creator|Leo        |Seafood      |Baseball      |Experimental        |Jellyfish      |Japanese          |Kayaking         |This Is Us              |Fuchsia       |Cocktail      |Post-Rock     |Edge AI                  |Maserati    |
|brandon32@example.org        |27342 Christine Club\nSouth Cindy, SD 97227             |Bhutan                   |Male  |85 |Athlete        |Leo        |Fried Rice   |Polo          |Steampunk           |Lizard         |Russian           |Model Building   |Grey's Anatomy          |Ivory         |Whiskey Sour  |Country       |5G                       |Ford        |
|westthomas@example.org       |007 Joseph Lights\nHerbertmouth, KS 92526               |Suriname                 |Male  |28 |Game Developer |Virgo      |Spaghetti    |Surfing       |Noir                |Monkey         |Polish            |Geocaching       |Doctor Who              |Rose          |Margarita     |Synthwave     |Microservices            |Bentley     |
|felicia31@example.net        |46124 Rivas Dale Apt. 053\nMasonborough, OR 28373       |Montserrat               |Male  |56 |Accountant     |Pisces     |Fajitas      |Snooker       |Documentary         |Bat            |Turkish           |Photography      |The Office              |Amethyst      |Wine          |World Music   |Data Warehousing         |Skoda       |
|ofreeman@example.org         |93184 Allen Spurs Apt. 514\nEast Jocelynberg, NJ 55288  |Spain                    |Male  |65 |Actor          |Gemini     |Eclairs      |Archery       |Experimental        |Cat            |Greek             |Gardening        |NCIS                    |Plum          |Cocktail      |Post-Rock     |Synthetic Data           |Rimac       |
|cannonrachael@example.net    |9786 Heather Forest\nSouth Pedroshire, NE 52237         |Burundi                  |Female|41 |Content Creator|Leo        |Pho          |Equestrian    |Historical          |Turtle         |Japanese          |Snorkeling       |Westworld               |Silver        |Kombucha      |Hip-Hop       |Serverless Computing     |Mitsubishi  |
+-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------------------+--------------+--------------+--------------+-------------------------+------------+
only showing top 20 rows

SPLIT DATAFRAME

In [12]:
# divide el dataframe
num_parts = 2
spark_dataframe_sql, spark_dataframe_delta = split_spark_dataframe(spark_dataframe_sample, num_parts)

# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_sql.cache()
spark_dataframe_delta.cache()

spark_dataframe_sql.show()
spark_dataframe_delta.show()
Successfully created 2 DataFrames.
                                                                                
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+
|               email|             address|             country|   sex|age|     profession|zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language|            hobby|    favorite_tv_show|favorite_color|favorite_drink|favorite_music| favorite_technology|favorite_car|
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+
|kathryn69@example...|060 Steven Row Ap...|                Guam|  Male| 38|          Pilot|   Aquarius|        Sushi|       Parkour|           Superhero|          Snake|           Serbian|          Cycling|               Ozark|      Sapphire|          Soda|         Indie|          Containers| Lamborghini|
|mallory01@example...|65236 Mcclure Ave...|              Zambia|  Male| 26|     Journalist|     Taurus|   Cheesecake|      Lacrosse|                Epic|           Seal|             Dutch|          Fishing|        Breaking Bad|     Chocolate|      Smoothie|           R&B|    Data Warehousing|        Audi|
|tristankent@examp...|966 Deborah Drive...|               Korea|Female| 69|       Mechanic|   Aquarius|      Burrito|          Golf|             Romance|            Fox|          Romanian|          Origami|            The Boys|       Scarlet|        Coffee|  Experimental|              DevOps|     Hyundai|
|russelljohn@examp...|745 Fitzgerald Pa...|          Montserrat|  Male| 79|     Translator|     Pisces|   Lamb Chops|Figure Skating|        Mockumentary|           Frog|        Lithuanian|          Reading|               Arrow|          Teal|          Chai|         Blues|          Blockchain|         BMW|
|scarlson@example.com|285 Scott Extensi...|              Greece|  Male| 71|  Social Worker|      Aries|     Shawarma|      Swimming|        Experimental|      Jellyfish|        Portuguese|         Learning|               Fargo|        Maroon|        Matcha|         Opera|     Green Computing| Lamborghini|
|middletonmargaret...|3256 Jennifer Pas...|              Gambia|  Male| 88|         Artist|     Cancer|      Fajitas|     Taekwondo|            Disaster|       Antelope|            Arabic|          Fitness|     The Mandalorian|        Indigo|      Kombucha|          Zouk|      Nanotechnology|  Land Rover|
|twallace@example.net|903 Nicholas Cove...|              Jordan|  Male| 27|      Architect|     Gemini|      Hot Dog|        Soccer|              Parody|     Chimpanzee|             Welsh|        Astronomy|      House of Cards|         Peach|     Americano|         Latin|          Containers|Aston Martin|
|martinmark@exampl...|12268 Daniel Have...|        South Africa|  Male| 55|  Social Worker|Sagittarius|    Ice Cream|        Rowing|         Silent Film|        Octopus|           Swahili|       Stargazing|          Succession|     Chocolate|      Lemonade|         House|          Containers|      Nissan|
|jfreeman@example.org|704 Randall Plain...|             Germany|Female| 79|        Teacher|    Scorpio|      Lasagna|      Handball|             Musical|           Bear|           Spanish|          Dancing|           The Flash|         Brown|     Black Tea|         Lo-fi|             Edge AI|       Buick|
|whitekristen@exam...|46919 Joseph Stra...|      Norfolk Island|  Male| 65|    Electrician|      Aries|Chicken Wings|         Rugby|        Period Piece|          Sheep|        Portuguese|          Origami|                 CSI|    Chartreuse|          Soda|     Classical| Autonomous Vehicles|         Kia|
|nicolerose@exampl...|8419 Howard Shoal...|               Italy|Female| 48|           NULL|      Virgo|      Eclairs|        Skiing|         Independent|            Fox|          Estonian|    Playing Cards|Buffy the Vampire...|        Yellow|          Soda|        Cumbia|   Augmented Reality|  Mitsubishi|
|  mary11@example.com|8519 Brian Glen\n...|              Canada|Female| 95|      Bartender|     Taurus|        Pizza|       Bowling|            Disaster|        Penguin|            Arabic|           Gaming|           Westworld|        Orange|     Green Tea|        Bolero|          Data Lakes|       Mazda|
|davidfernandez@ex...|49951 Robert Lock...|            Paraguay|  Male| 23|   Entrepreneur|Sagittarius|    Empanadas|      Shooting|              Comedy|     Chimpanzee|        Indonesian|Language Learning|             Vikings|        Bronze|   Tonic Water|    Electronic|          Data Lakes|        SEAT|
|sharon37@example.org|PSC 8736, Box 394...|Saint Pierre and ...|  Male| 32|       Designer|  Capricorn|    Dumplings| Weightlifting|     Romantic Comedy|        Buffalo|             Welsh|Language Learning|             Lucifer|        Yellow|       Sangria|         Metal|Brain-Computer In...|        Audi|
|brandon67@example...|25491 Fritz Club\...|             Grenada|Female| 47|Content Creator|        Leo|      Seafood|      Baseball|        Experimental|      Jellyfish|          Japanese|         Kayaking|          This Is Us|       Fuchsia|      Cocktail|     Post-Rock|             Edge AI|    Maserati|
|brandon32@example...|27342 Christine C...|              Bhutan|  Male| 85|        Athlete|        Leo|   Fried Rice|          Polo|           Steampunk|         Lizard|           Russian|   Model Building|      Grey's Anatomy|         Ivory|  Whiskey Sour|       Country|                  5G|        Ford|
|westthomas@exampl...|007 Joseph Lights...|            Suriname|  Male| 28| Game Developer|      Virgo|    Spaghetti|       Surfing|                Noir|         Monkey|            Polish|       Geocaching|          Doctor Who|          Rose|     Margarita|     Synthwave|       Microservices|     Bentley|
|felicia31@example...|46124 Rivas Dale ...|          Montserrat|  Male| 56|     Accountant|     Pisces|      Fajitas|       Snooker|         Documentary|            Bat|           Turkish|      Photography|          The Office|      Amethyst|          Wine|   World Music|    Data Warehousing|       Skoda|
|ofreeman@example.org|93184 Allen Spurs...|               Spain|  Male| 65|          Actor|     Gemini|      Eclairs|       Archery|        Experimental|            Cat|             Greek|        Gardening|                NCIS|          Plum|      Cocktail|     Post-Rock|      Synthetic Data|       Rimac|
|cannonrachael@exa...|9786 Heather Fore...|             Burundi|Female| 41|Content Creator|        Leo|          Pho|    Equestrian|          Historical|         Turtle|          Japanese|       Snorkeling|           Westworld|        Silver|      Kombucha|       Hip-Hop|Serverless Computing|  Mitsubishi|
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+
only showing top 20 rows

[Stage 11:===========================================>              (3 + 1) / 4]
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+
|               email|             address|             country|   sex|age|     profession|zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language|         hobby|    favorite_tv_show|favorite_color|favorite_drink|favorite_music| favorite_technology| favorite_car|
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+
|  dawn38@example.net|14388 Li Flat\nSo...|                Togo|Female| 22|   Entrepreneur|      Libra|        Ramen|        Boxing|     Courtroom Drama|         Donkey|           Italian|  Scrapbooking|             Lucifer|       Crimson|          NULL|        Gospel|            Robotics|         Opel|
|xballard@example.org|20007 Sosa Statio...|               Korea|  Male| 81|           NULL|     Gemini|      Gnocchi|        Tennis|          Teen Drama|         Rabbit|             Tamil|   Photography|             Lucifer|          Gold|  Nitro Coffee|         Indie| Zero Trust Security|  Rolls-Royce|
|charles72@example...|8950 Shawn Juncti...|                Togo|Female| 68|    Firefighter|     Pisces|         NULL|   Paragliding|     Magical Realism|       Kangaroo|          Georgian|    Meditation|          This Is Us|          NULL|        Matcha|        Reggae|          Containers|         NULL|
| adaniel@example.net|58658 Frazier Jun...|       Liechtenstein|  Male| 31|  Web Developer|      Aries|      Burrito|      Baseball|                 Spy|       Starfish|         Slovenian|  Scrapbooking|            Homeland|        Yellow|      Daiquiri|        Cumbia|    Data Warehousing|        Isuzu|
|hedwards@example.net|7507 Jessica Tunn...|         Puerto Rico|Female| 64|       Musician|     Cancer|Chicken Wings|       Archery|         Independent|        Penguin|              Thai|       Pottery|           The Crown|       Scarlet|     Cold Brew|      Shoegaze|    Machine Learning|        Mazda|
|nicholas88@exampl...|Unit 2882 Box 228...|       French Guiana|  Male| 29|     Researcher|     Taurus|      Fajitas|       Parkour|     Fantasy Romance|          Koala|         Ukrainian|      Canoeing|         The Witcher|        Salmon|  Nitro Coffee|        Reggae|                 IoT|        Isuzu|
|carpentertodd@exa...|17382 Michael Fal...|              Guyana|Female| 38|     Astronomer|    Scorpio|    Empanadas|          NULL|           Animation|          Moose|           Italian|  Volunteering|             Mad Men|          NULL|     Milkshake|          NULL|                 IoT|    SsangYong|
|nicholasreyes@exa...|5848 Williams Tra...|         Puerto Rico|  Male| 81|      Policeman|      Aries|    Empanadas|        Karate|                Epic|           Goat|            German|  Wine Tasting|        The Sopranos|         Brown|     Margarita|        Gospel|Serverless Computing|         Fiat|
|stonemichael@exam...|PSC 1170, Box 451...|         Isle of Man|Female| 46|    Firefighter|     Taurus|        Pizza| Speed Skating|           Biography|      Crocodile|            French| Bird Watching|              Narcos|         White| Hot Chocolate|          Soul|           Metaverse|         Mini|
|deborahjacobs@exa...|3899 Li Ranch Sui...|          Mozambique|  Male| 73|    Electrician|      Virgo|      Tamales|        Skiing|         Silent Film|        Giraffe|             Uzbek|       Fishing|        Black Mirror|        Purple|   Tonic Water|     Afrobeats|      Bioinformatics| Aston Martin|
|gcummings@example...|31591 Williams La...|             Bahamas|  Male| 22|        Dentist|      Libra|   Fried Rice|    Ice Hockey|              Sci-Fi|         Parrot|         Icelandic|       Pottery|        Breaking Bad|         Brown|         Mocha|         Samba|              DevOps|     Maserati|
| bruce62@example.net|875 Walker Manors...|Slovakia (Slovak ...|  Male| 25|     Accountant|     Gemini| Potato Salad|       Curling|             Fantasy|          Sheep|            Polish|       Dancing|               Ozark|       Fuchsia|          Wine|       Country|   Augmented Reality|         Audi|
|zhangkaren@exampl...|Unit 7845 Box 742...|              Serbia|  Male| 47|        Athlete|    Scorpio|  Onion Rings|    Volleyball|     Magical Realism|          Koala|           Spanish|   Woodworking|How I Met Your Mo...|       Scarlet|         Water|       Country|      Synthetic Data|          BMW|
| uromero@example.net|5462 Jenna Mills\...|               Japan|Female| 36|      Architect|     Taurus|    Ice Cream|  Snowboarding|            Suspense|            Cat|            Pashto|       Origami|            Homeland|          Lime| Gin and Tonic|     Synthwave|       Digital Twins|      McLaren|
|kthompson@example...|03287 Silva Pike\...|           Hong Kong|Female| 93|     Consultant|     Gemini|         Soup| Speed Skating|                 Spy|           Lion|            Hebrew|Puzzle Solving|Buffy the Vampire...|        Orange|     Root Beer|       Dubstep|Brain-Computer In...|        Honda|
|   uyang@example.org|02975 Travis Isle...|Svalbard & Jan Ma...|Female| 71|        Athlete|     Pisces|      Gnocchi|        Soccer|          Historical|          Shark|           Italian|  Volunteering|          The Office|        Yellow|     Root Beer|     Synthwave|      Synthetic Data|      Citroën|
|danielmichael@exa...|Unit 4088 Box 513...|             Nigeria|  Male| 70|Chef de Cuisine|      Aries|        Curry|     Wrestling|           Superhero|       Starfish|        Indonesian|   Board Games|                NCIS|      Charcoal|         Latte|           Ska|Privacy-Preservin...|      Ferrari|
|nortonchelsea@exa...|6295 Heidi Harbor...|      American Samoa|Female| 86|  Civil Servant|   Aquarius| Potato Salad|        Hockey|              Satire|           Bird|          Estonian|      Knitting|        The Sopranos|      Lavender|          Wine|     Post-Rock| Agile Methodologies|         Jeep|
| bcortez@example.org|30448 White Ville...|            Slovenia|Female| 34|      Economist|        Leo|      Risotto|      Lacrosse|                Epic|       Flamingo|           Turkish|   Woodworking|        Black Mirror|         Ivory|        Matcha|           EDM| Agile Methodologies|Mercedes-Benz|
|richardsdrew@exam...|78571 Rebecca Lan...|               Yemen|  Male| 95|         Writer|      Libra|      Burrito|         Rugby|          Road Movie|        Leopard|             Irish|  Scuba Diving|           The Crown|      Amethyst|          Milk|        Techno|       Microservices|         Jeep|
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+
only showing top 20 rows

                                                                                

CREATE DATABASE

In [13]:
# Ejemplo de uso
database_name = "delta_spark_database"
create_database(spark_session, database_name)

print()

list_all_databases_and_tables(spark_session)
Database 'delta_spark_database' created or already exists.

The following databases and tables are present in the Spark Catalog.

Out[13]:
{'default': [], 'delta_spark_database': []}

SAVE DELTA TABLE USING SPARK SQL
¶

In [14]:
database_sql = "default"
table_sql = "delta_sql"
dataframe_sql = spark_dataframe_sql
sql_warehouse_dir = warehouse_dir
partition_by = ['zodiac_sign']

create_delta_table_with_spark_dataframe_and_register(
    spark_session, database_sql, table_sql, dataframe_sql, sql_warehouse_dir, partition_by
)
24/12/27 20:15:17 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                
Temporary files at './warehouse-spark/spark_catalog/database/default/delta_sql' have been deleted.
Table 'delta_sql' created and registered at './warehouse-spark/spark_catalog/database/default/delta_sql' in database 'default' with partitioning by ['zodiac_sign'].

SAVE DELTA TABLE FROM A SPARK DATAFRAME
¶

In [15]:
database_delta = "delta_spark_database"
table_delta_1 = "delta_dataframe_1"
partition_by = ['zodiac_sign']
table_delta_2 = "delta_dataframe_2"
dataframe_delta_1 = spark_dataframe_delta
dataframe_delta_2 = spark_dataframe_sql
delta_warehouse_dir = warehouse_dir

create_delta_table_in_database(
    spark_session, database_delta, table_delta_1, dataframe_delta_1, delta_warehouse_dir, partition_by
)
create_delta_table_in_database(
    spark_session, database_delta, table_delta_2, dataframe_delta_2, delta_warehouse_dir
)
Table 'delta_spark_database.delta_dataframe_1' dropped.
                                                                                
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_1'.
Table 'delta_dataframe_1' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_1' with partitioning by ['zodiac_sign'].
Table 'delta_spark_database.delta_dataframe_2' dropped.
                                                                                
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_2'.
Table 'delta_dataframe_2' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_2' with partitioning by none.

SAVE DATAFRAME AS DELTA PARQUET
¶

In [16]:
parquet_dataframe = spark_dataframe_sample
file_name = "spark_dataframe_delta"
file_path_delta = f"{base_dir}/spark_files/delta_parquet"
partition_by = ['zodiac_sign']

save_dataframe_as_delta_parquet(spark_dataframe_sample, file_name, file_path_delta, partition_by)
24/12/27 20:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/12/27 20:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/27 20:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/27 20:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/27 20:15:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/12/27 20:15:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/27 20:15:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/27 20:15:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/27 20:15:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                
DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/delta_parquet/spark_dataframe_delta' with partitioning by ['zodiac_sign'].

SAVE DATAFRAME AS CLASSIC PARQUET (SINGLE FILE)
¶

In [17]:
parquet_dataframe = spark_dataframe_sample
file_name_compact = "spark_dataframe_compact"
file_path_compact_parquet = f"{base_dir}/spark_files/classic_parquet/parquet_clasico/parque_compacto"

save_dataframe_as_parquet(spark_dataframe_sample, file_name_compact, file_path_compact_parquet)
[Stage 81:===========================================>              (3 + 1) / 4]
DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parque_compacto/spark_dataframe_compact' with partitioning by None.
                                                                                

SAVE DATAFRAME AS PARTITIONED CLASSIC PARQUET
¶

In [18]:
parquet_dataframe = spark_dataframe_sample
file_name = "spark_dataframe_patitioned"
file_path_partitioned_parquet = f"{base_dir}/spark_files/classic_parquet/parquet_clasico/parquet_particionado"
partition_by = ['zodiac_sign']

save_dataframe_as_parquet(spark_dataframe_sample, file_name, file_path_partitioned_parquet)
[Stage 82:===========================================>              (3 + 1) / 4]
DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parquet_particionado/spark_dataframe_patitioned' with partitioning by None.
                                                                                

READ PARQUET FILES
¶

DELTA PARQUET¶

In [19]:
delta_parquet = read_parquet_or_delta_file(spark_session, file_path_delta)
display(delta_parquet.toPandas())

del delta_parquet
gc.collect()
Successfully read Parquet file from './warehouse-spark/spark_files/delta_parquet'.
email address country sex age profession favorite_food favorite_sport favorite_movie_genre favorite_animal preferred_language hobby favorite_tv_show favorite_color favorite_drink favorite_music favorite_technology favorite_car zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Out[19]:
20

REGULAR PARQUET¶

COMPACT PARQUET¶

In [20]:
file_parquet = read_parquet_or_delta_file(spark_session, file_path_compact_parquet)
display(file_parquet.toPandas())

del file_parquet
gc.collect()
Successfully read Parquet file from './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parque_compacto'.
                                                                                
email address country sex age profession zodiac_sign favorite_food favorite_sport favorite_movie_genre favorite_animal preferred_language hobby favorite_tv_show favorite_color favorite_drink favorite_music favorite_technology favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Out[20]:
29

PARTITIONED PARQUET¶

In [21]:
file_parquet = read_parquet_or_delta_file(spark_session, file_path_partitioned_parquet)
display(file_parquet.toPandas())

del file_parquet 
gc.collect()
Successfully read Parquet file from './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parquet_particionado'.
                                                                                
email address country sex age profession zodiac_sign favorite_food favorite_sport favorite_movie_genre favorite_animal preferred_language hobby favorite_tv_show favorite_color favorite_drink favorite_music favorite_technology favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Out[21]:
20

LIST THE TABLES IN THE DATABASE

In [22]:
list_all_databases_and_tables(spark_session)
The following databases and tables are present in the Spark Catalog.

Database: default, Table: delta_sql

Database: delta_spark_database, Table: delta_dataframe_1

Database: delta_spark_database, Table: delta_dataframe_2

Out[22]:
{'default': ['delta_sql'],
 'delta_spark_database': ['delta_dataframe_1', 'delta_dataframe_2']}

CONSULTAS¶

SELECT, WHERE-FILTER, LIMIT

Create Queries Using Spark SQL¶

In [23]:
database = "default"
table = "delta_sql"

query = f"""
SELECT email, hobby, country 
FROM {database}.{table} 
WHERE AGE <= 18 
LIMIT 5
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.show(truncate=False)
spark_dataframe.toPandas()
+---------------------------+---------+--------------+
|email                      |hobby    |country       |
+---------------------------+---------+--------------+
|esparzalouis@example.net   |Traveling|Timor-Leste   |
|melinda87@example.net      |Gaming   |Montserrat    |
|fnelson@example.net        |Knitting |American Samoa|
|jenniferjohnson@example.net|Chess    |New Zealand   |
|vmiller@example.net        |Antiquing|Egypt         |
+---------------------------+---------+--------------+

Out[23]:
email hobby country
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [24]:
from pyspark.sql.functions import col

# Aplicar el filtro directamente sobre el DataFrame
filtered_dataframe = (
    spark_dataframe_sql.select("email", "hobby", "country").filter(col("AGE") <= 18).limit(5)
)

# Mostrar los resultados
filtered_dataframe.toPandas()
Out[24]:
email hobby country
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [25]:
# Apply the Filter Directly on the DataFrame
filtered_dataframe = (
    spark_dataframe_sql.select("email", "hobby", "country").where(spark_dataframe_sql["AGE"] <= 18).limit(5)
)

# Display the Results
filtered_dataframe.toPandas()
Out[25]:
email hobby country
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CTE, SELECT, WHERE, BETWEEN, GROUP BY, HAVING, ORDER BY, LIMIT, OFFSET

In [26]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
WITH FOURTY AS (
    SELECT * 
    FROM {database}.{table}
    WHERE AGE <> 0
    AND AGE BETWEEN 1 AND 99
)

SELECT age, country, zodiac_sign, count(*) as TOTAL
FROM FOURTY
GROUP BY age, country, zodiac_sign
HAVING TOTAL > 0
ORDER BY TOTAL DESC, AGE DESC, COUNTRY
LIMIT 13
OFFSET 31
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[26]:
age country zodiac_sign TOTAL
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark SQL¶

In [27]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
WITH FOURTY AS (
    SELECT * 
    FROM {database}.{table}
    WHERE AGE <> 0
    AND AGE BETWEEN 1 AND 99
)

SELECT age, country, zodiac_sign, count(*) as TOTAL
FROM FOURTY
GROUP BY age, country, zodiac_sign
HAVING TOTAL > 0
ORDER BY TOTAL DESC, AGE DESC, COUNTRY
LIMIT 13
OFFSET 31
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[27]:
age country zodiac_sign TOTAL
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [28]:
# Single Block of Operations
result_df = (
    spark_dataframe_delta.filter(
        (spark_dataframe_delta["AGE"] > 0) & (spark_dataframe_delta["AGE"].between(1, 99))
    )  # Filtrar filas donde AGE <> 0 y AGE BETWEEN 1 AND 99
    .groupBy("age", "country", "zodiac_sign")  # Agrupar por age, country y zodiac_sign
    .count()  # Contar las filas en cada grupo
    .withColumnRenamed("count", "TOTAL")  # Renombrar la columna 'count' a 'TOTAL'
    .filter("TOTAL > 0")  # Filtrar grupos donde TOTAL > 0 (HAVING en SQL)
    .orderBy(
        ["TOTAL", "age", "country"], ascending=[False, False, True]
    )  # Ordenar por TOTAL y age en orden descendente y por country en orden Ascendente
    .offset(31)  # Saltar las primeras 31 filas
    .limit(13)  # Limitar el resultado a 31 filas
)

# Display the Results
result_df.toPandas()
Out[28]:
age country zodiac_sign TOTAL
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

AGGREGATE FUNCTIONS

Create Queries Using Spark SQL¶

In [29]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
SELECT
    SUM(age) AS total_age,
    AVG(age) AS average_age,
    COUNT(age) AS total_records,
    MAX(age) AS max_age,
    MIN(age) AS min_age,
    APPROX_COUNT_DISTINCT(age) AS distinct_ages,
    STDDEV(age) AS stddev_age,
    VARIANCE(age) AS variance_age,
    SKEWNESS(age) AS skewness_age,
    KURTOSIS(age) AS kurtosis_age,
    COLLECT_LIST(age) AS age_list,
    COLLECT_SET(age) AS unique_ages
FROM {database}.{table}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[29]:
total_age average_age total_records max_age min_age distinct_ages stddev_age variance_age skewness_age kurtosis_age age_list unique_ages
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [30]:
from pyspark.sql.functions import (
    sum as spark_sum, avg, count, max as spark_max, min as spark_min, 
    approx_count_distinct, stddev, variance, skewness, kurtosis, collect_list, collect_set
)

spark_dataframe_delta.agg(
    spark_sum("age").alias("total_age"),
    avg("age").alias("average_age"),
    count("age").alias("total_records"),
    spark_max("age").alias("max_age"),
    spark_min("age").alias("min_age"),
    approx_count_distinct("age").alias("distinct_ages"),
    stddev("age").alias("stddev_age"),
    variance("age").alias("variance_age"),
    skewness("age").alias("skewness_age"),
    kurtosis("age").alias("kurtosis_age"),
    collect_list("age").alias("age_list"),
    collect_set("age").alias("unique_ages")
).toPandas()
Out[30]:
total_age average_age total_records max_age min_age distinct_ages stddev_age variance_age skewness_age kurtosis_age age_list unique_ages
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CASE WHEN - THEN

Create Queries Using Spark SQL¶

In [31]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
SELECT
    CASE 
        WHEN age < 1 THEN 'Newborn'
        WHEN age >= 1 AND age < 3 THEN 'Infant'
        WHEN age >= 3 AND age < 5 THEN 'Toddler'
        WHEN age >= 5 AND age < 7 THEN 'Preschooler'
        WHEN age >= 7 AND age < 10 THEN 'Early School Age'
        WHEN age >= 10 AND age < 14 THEN 'Pre-Adolescent'
        WHEN age >= 14 AND age < 18 THEN 'Teenager'
        WHEN age >= 18 AND age < 28 THEN 'Young Adult'
        WHEN age >= 28 AND age < 40 THEN 'Adult'
        WHEN age >= 40 AND age < 50 THEN 'Midlife Adult'
        WHEN age >= 50 AND age < 60 THEN 'Experienced Adult'
        WHEN age >= 60 AND age < 70 THEN 'Mature Adult'
        ELSE 'Senior'
    END AS age_category,
    *
FROM {database}.{table}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[31]:
age_category email address country sex age profession zodiac_sign favorite_food favorite_sport favorite_movie_genre favorite_animal preferred_language hobby favorite_tv_show favorite_color favorite_drink favorite_music favorite_technology favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [32]:
import pyspark.sql.functions as F

spark_dataframe_delta_case = spark_dataframe_delta.withColumn(
    "age_category",
    F.when(spark_dataframe_delta["age"] < 1, "Newborn")
    .when((spark_dataframe_delta["age"] >= 1) & (spark_dataframe_delta["age"] < 3), "Infant")
    .when((spark_dataframe_delta["age"] >= 3) & (spark_dataframe_delta["age"] < 5), "Toddler")
    .when((spark_dataframe_delta["age"] >= 5) & (spark_dataframe_delta["age"] < 7), "Preschooler")
    .when((spark_dataframe_delta["age"] >= 7) & (spark_dataframe_delta["age"] < 10), "Early School Age")
    .when((spark_dataframe_delta["age"] >= 10) & (spark_dataframe_delta["age"] < 14), "Pre-Adolescent")
    .when((spark_dataframe_delta["age"] >= 14) & (spark_dataframe_delta["age"] < 18), "Teenager")
    .when((spark_dataframe_delta["age"] >= 18) & (spark_dataframe_delta["age"] < 28), "Young Adult")
    .when((spark_dataframe_delta["age"] >= 28) & (spark_dataframe_delta["age"] < 40), "Adult")
    .when((spark_dataframe_delta["age"] >= 40) & (spark_dataframe_delta["age"] < 50), "Midlife Adult")
    .when((spark_dataframe_delta["age"] >= 50) & (spark_dataframe_delta["age"] < 60), "Experienced Adult")
    .when((spark_dataframe_delta["age"] >= 60) & (spark_dataframe_delta["age"] < 70), "Mature Adult")
    .otherwise("Senior")
)

# Reorganizar para poner "age_category" en la primera posición
cols = ['age_category'] + [col for col in spark_dataframe_delta_case.columns if col != 'age_category']
spark_dataframe_delta_case.select(cols).toPandas()
Out[32]:
age_category email address country sex age profession zodiac_sign favorite_food favorite_sport favorite_movie_genre favorite_animal preferred_language hobby favorite_tv_show favorite_color favorite_drink favorite_music favorite_technology favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

DISTINCT - CAST

Create Queries Using Spark SQL¶

In [33]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
SELECT DISTINCT
    sex, 
    country, 
    FORMAT_NUMBER(CAST(age AS DOUBLE), 2) AS age,
    zodiac_sign, 
    profession
FROM {database}.{table}
ORDER BY sex ASC, country DESC, age ASC, zodiac_sign DESC, profession ASC
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[33]:
sex country age zodiac_sign profession
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [34]:
from pyspark.sql import functions as F

spark_dataframe_delta.select(
        "sex", 
        "country", 
        F.format_number(F.col("age").cast("float"), 2).alias("age"), 
        "zodiac_sign", 
        "profession"
    ) \
    .distinct() \
    .orderBy(
        F.col("sex").asc(), 
        F.col("country").desc(), 
        F.col("age").asc(), 
        F.col("zodiac_sign").desc(), 
        F.col("profession").asc()
    ).toPandas()
Out[34]:
sex country age zodiac_sign profession
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CREATE DATASET FOR SHORT EXAMPLES

In [35]:
# Explicit Schema with the Desired Order
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, MapType
from pyspark.sql.functions import col, struct

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("related_id", IntegerType(), True),
        StructField("zodiac_sign", StringType(), True),
        StructField("country", StringType(), True),
        StructField("values", ArrayType(IntegerType()), True),
        StructField("fruits_and_vitamins", MapType(StringType(), StringType()), True)
    ]
)

list_data_dict = [
  {"id": 1, "name": "Nathalie", "age": 1, "related_id": 11, "zodiac_sign": "Capricorn", "country": "Colombia", "values": [1, 2, 3, 4, 5], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Orange": "Vitamin C"}}, 
  {"id": 2, "name": "Cora", "age": 3, "related_id": 5, "zodiac_sign": "Taurus", "country": "USA", "values": [6, 7, 8, 9, 10], "fruits_and_vitamins": {"Mango": "Vitamin A", "Grapes": "Vitamin K", "Papaya": "Vitamin C"}}, 
  {"id": 3, "name": "Gaby", "age": 5, "related_id": 14, "zodiac_sign": "Gemini", "country": "Colombia", "values": [11, 12, 13, 14, 15], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Pineapple": "Vitamin A", "Strawberry": "Vitamin C"}}, 
  {"id": 4, "name": "Muneca", "age": 7, "related_id": 6, "zodiac_sign": "Cancer", "country": "Portugal", "values": [16, 17, 18, 19, 20], "fruits_and_vitamins": {"Peach": "Vitamin A", "Blueberry": "Vitamin C", "Watermelon": "Vitamin A"}}, 
  {"id": 5, "name": "Principe", "age": 9, "related_id": 2, "zodiac_sign": "Leo", "country": "Argentina", "values": [21, 22, 23, 24, 25], "fruits_and_vitamins": {"Avocado": "Vitamin E", "Apple": "Vitamin C", "Grapefruit": "Vitamin C"}}, 
  {"id": 6, "name": "Ana", "age": 11, "related_id": 3, "zodiac_sign": "Virgo", "country": "Colombia", "values": [26, 27, 28, 29, 30], "fruits_and_vitamins": {"Pineapple": "Vitamin C", "Mango": "Vitamin A", "Banana": "Vitamin B6"}}, 
  {"id": 7, "name": "Cecilia", "age": 13, "related_id": 20, "zodiac_sign": "Libra", "country": "Colombia", "values": [31, 32, 33, 34, 35], "fruits_and_vitamins": {"Watermelon": "Vitamin C", "Apple": "Vitamin C", "Orange": "Vitamin C"}}, 
  {"id": 8, "name": "Lucia", "age": 15, "related_id": 13, "zodiac_sign": "Scorpio", "country": "Peru", "values": [36, 37, 38, 39, 40], "fruits_and_vitamins": {"Peach": "Vitamin A", "Mango": "Vitamin A", "Pineapple": "Vitamin C"}}, 
  {"id": 9, "name": "Zeus", "age": 17, "related_id": 7, "zodiac_sign": "Sagittarius", "country": "Mexico", "values": [41, 42, 43, 44, 45], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Orange": "Vitamin C", "Papaya": "Vitamin C"}}, 
  {"id": 10, "name": "Guadalupe", "age": 15, "related_id": 17, "zodiac_sign": "Aries", "country": "Colombia", "values": [46, 47, 48, 49, 50], "fruits_and_vitamins": {"Blueberry": "Vitamin C", "Banana": "Vitamin B6", "Grapes": "Vitamin K"}}, 
  {"id": 21, "name": "Spark", "age": 3, "related_id": 22, "zodiac_sign": "Aquarius", "country": "Portugal", "values": [51, 52, 53, 54, 55], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 22, "name": "Delta", "age": 7, "related_id": 21, "zodiac_sign": "Pisces", "country": "Rusia", "values": [56, 57, 58, 59, 60], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Strawberry": "Vitamin C"}}, 
  {"id": 11, "name": "Augusto", "age": 17, "related_id": 1, "zodiac_sign": "Aries", "country": "Argentina", "values": [61, 62, 63, 64, 65], "fruits_and_vitamins": {"Peach": "Vitamin A", "Grapefruit": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 12, "name": "Muiscas", "age": 13, "related_id": 16, "zodiac_sign": "Taurus", "country": "Portugal", "values": [66, 67, 68, 69, 70], "fruits_and_vitamins": {"Watermelon": "Vitamin A", "Orange": "Vitamin C", "Papaya": "Vitamin C"}}, 
  {"id": 13, "name": "Jorge", "age": 11, "related_id": 8, "zodiac_sign": "Gemini", "country": "USA", "values": [71, 72, 73, 74, 75], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Apple": "Vitamin C", "Kiwi": "Vitamin C"}}, 
  {"id": 14, "name": "Sandra", "age": 9, "related_id": 4, "zodiac_sign": "Cancer", "country": "Argentina", "values": [76, 77, 78, 79, 80], "fruits_and_vitamins": {"Grapes": "Vitamin K", "Orange": "Vitamin C", "Blueberry": "Vitamin C"}}, 
  {"id": 15, "name": "Carlos", "age": 2, "related_id": 18, "zodiac_sign": "Leo", "country": "Peru", "values": [81, 82, 83, 84, 85], "fruits_and_vitamins": {"Watermelon": "Vitamin C", "Mango": "Vitamin A", "Apple": "Vitamin C"}}, 
  {"id": 16, "name": "Isabel", "age": 4, "related_id": 12, "zodiac_sign": "Virgo", "country": "Mexico", "values": [86, 87, 88, 89, 90], "fruits_and_vitamins": {"Papaya": "Vitamin C", "Kiwi": "Vitamin C", "Pineapple": "Vitamin A"}}, 
  {"id": 17, "name": "Paola", "age": 6, "related_id": 9, "zodiac_sign": "Libra", "country": "Colombia", "values": [91, 92, 93, 94, 95], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Grapefruit": "Vitamin C", "Banana": "Vitamin B6"}}, 
  {"id": 18, "name": "David", "age": 8, "related_id": 15, "zodiac_sign": "Scorpio", "country": "Mexico", "values": [96, 97, 98, 99, 100], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Apple": "Vitamin C"}}, 
  {"id": 19, "name": "Sara", "age": 10, "related_id": 19, "zodiac_sign": "Sagittarius", "country": "Colombia", "values": [101, 102, 103, 104, 105], "fruits_and_vitamins": {"Pineapple": "Vitamin C", "Orange": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 20, "name": "Claudia", "age": 12, "related_id": 10, "zodiac_sign": "Capricorn", "country": "Mexico", "values": [106, 107, 108, 109, 110], "fruits_and_vitamins": {"Peach": "Vitamin A", "Apple": "Vitamin C", "Grapefruit": "Vitamin C"}},
  {"id": 21, "name": "Spark", "age": 3, "related_id": 22, "zodiac_sign": "Aquarius", "country": "Portugal", "values": [51, 52, 53, 54, 55], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 22, "name": "Delta", "age": 7, "related_id": 21, "zodiac_sign": "Pisces", "country": "Rusia", "values": [56, 57, 58, 59, 60], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Strawberry": "Vitamin C"}}, 
]

# Create DataFrame with the Explicit Schema
spark_dataframe_joins_full = spark_session.createDataFrame(list_data_dict, schema)

# Cache the DataFrame to Avoid Recomputing in Subsequent Operations, Improving Performance.
spark_dataframe_joins_full.cache()

# Display the DataFrame with the Correct Order
display(spark_dataframe_joins_full.toPandas())

# Remove the HashMap Column to Avoid Issues with Union, Except, etc.
spark_dataframe_joins = spark_dataframe_joins_full.drop("fruits_and_vitamins")

# Cache the DataFrame to Prevent Recalculation in Subsequent Operations, Enhancing Performance.
spark_dataframe_joins.cache()

# Display the DataFrame with the Correct Order
display(spark_dataframe_joins.toPandas())
                                                                                
id name age related_id zodiac_sign country values fruits_and_vitamins
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Decompose List and HashMap into Multiple Rows and Combine Them into New Rows¶

In [36]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark_dataframe_explode = spark_dataframe_joins_full \
    .withColumn("row_number", F.row_number().over(Window.partitionBy(F.col("country")).orderBy(F.lit(1)))) \
    .withColumn("value", F.explode(F.col("values"))) \
    .withColumn("fruit_vitamin", F.explode(F.map_entries(F.col("fruits_and_vitamins")))) \
    .select(
        "*", 
        F.col("fruit_vitamin.key").alias("fruit"), 
        F.col("fruit_vitamin.value").alias("vitamin")
    ) \
    .drop("fruit_vitamin") 

# Display the Spark DataFrame in Pandas Version
spark_dataframe_explode.toPandas()
Out[36]:
id name age related_id zodiac_sign country values fruits_and_vitamins row_number value fruit vitamin
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Multiple Datasets¶

In [37]:
num_parts = 2
spark_dataframe_uno, spark_dataframe_dos = split_spark_dataframe(spark_dataframe_joins, num_parts)

# Cache the DataFrame to prevent recalculation in subsequent operations, improving performance.
spark_dataframe_uno.cache()
spark_dataframe_dos.cache()

display(spark_dataframe_uno.toPandas(), spark_dataframe_dos.toPandas())
Successfully created 2 DataFrames.
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CROSS TAB¶

In [38]:
# Perform the crosstab between the 2 columns
df_crosstab = spark_dataframe_joins.crosstab("age", "zodiac_sign")

df_crosstab.toPandas()
Out[38]:
age_zodiac_sign Aquarius Aries Cancer Capricorn Gemini Leo Libra Pisces Sagittarius Scorpio Taurus Virgo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Add Total Sum Column¶

In [39]:
from pyspark.sql import functions as F

# Calculate the sum of all columns and add the 'total' column
df_crosstab_with_total = df_crosstab.select(
    "*",  # Keep all columns from the crosstab
    sum(F.coalesce(F.col(col), F.lit(0)) for col in df_crosstab.columns[1:]).alias(
        "total"
    ),
)

# Sort the DataFrame in descending order by the 'total' column
df_crosstab_with_total_sorted = df_crosstab_with_total.orderBy(
    ["total", "age_zodiac_sign"], ascending=[False, True]
)

# Convert the result to a Pandas DataFrame if necessary
df_crosstab_with_total_sorted.toPandas()
Out[39]:
age_zodiac_sign Aquarius Aries Cancer Capricorn Gemini Leo Libra Pisces Sagittarius Scorpio Taurus Virgo total
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create New Tables¶

In [40]:
database_delta = "delta_spark_database"
table_delta_all = "delta_dataframe_all"
table_delta_uno = "delta_dataframe_uno"
table_delta_dos = "delta_dataframe_dos"
dataframe_delta_all = spark_dataframe_joins
dataframe_delta_1 = spark_dataframe_uno
dataframe_delta_2 = spark_dataframe_dos
delta_warehouse_dir = warehouse_dir
partition_by = ['age']

create_delta_table_in_database(
    spark_session,
    database_delta,
    table_delta_all,
    dataframe_delta_all,
    delta_warehouse_dir,
    partition_by
)

create_delta_table_in_database(
    spark_session,
    database_delta,
    table_delta_uno,
    dataframe_delta_1,
    delta_warehouse_dir,
    partition_by
)
create_delta_table_in_database(
    spark_session,
    database_delta,
    table_delta_dos,
    dataframe_delta_2,
    delta_warehouse_dir,
    partition_by
)
list_all_databases_and_tables(spark_session)
Table 'delta_spark_database.delta_dataframe_all' dropped.
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/27 20:16:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_all'.
Table 'delta_dataframe_all' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_all' with partitioning by ['age'].
Table 'delta_spark_database.delta_dataframe_uno' dropped.
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_uno'.
Table 'delta_dataframe_uno' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_uno' with partitioning by ['age'].
Table 'delta_spark_database.delta_dataframe_dos' dropped.
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_dos'.
Table 'delta_dataframe_dos' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_dos' with partitioning by ['age'].
The following databases and tables are present in the Spark Catalog.

Database: default, Table: delta_sql

Database: delta_spark_database, Table: delta_dataframe_1

Database: delta_spark_database, Table: delta_dataframe_2

Database: delta_spark_database, Table: delta_dataframe_all

Database: delta_spark_database, Table: delta_dataframe_dos

Database: delta_spark_database, Table: delta_dataframe_uno

Out[40]:
{'default': ['delta_sql'],
 'delta_spark_database': ['delta_dataframe_1',
  'delta_dataframe_2',
  'delta_dataframe_all',
  'delta_dataframe_dos',
  'delta_dataframe_uno']}

WINDOW FUNCTIONS

Create Queries Using Spark SQL¶

In [41]:
database = "delta_spark_database"
table = "delta_dataframe_all"

query = f"""
SELECT
    country,  -- Country of residence
    age,  -- Age of the user
    -- Window functions applied to age and country
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY age DESC) AS row_number,  -- Assigns a row number based on age within the country
    COUNT(*) OVER (PARTITION BY country) AS count_by_country,  -- Counts the number of users by country
    RANK() OVER (PARTITION BY country ORDER BY age DESC) AS rank_by_country,  -- Assigns a rank based on age within each country, considering ties
    DENSE_RANK() OVER (PARTITION BY country ORDER BY age DESC) AS dense_rank_by_country,  -- Dense rank by country, no gaps in rank in case of ties
    MIN(age) OVER (PARTITION BY country) AS min_age_by_country,  -- Minimum age within each country
    MAX(age) OVER (PARTITION BY country) AS max_age_by_country,  -- Maximum age within each country
    SUM(age) OVER (PARTITION BY country) AS sum_age_by_country,  -- Sum of ages within each country
    AVG(age) OVER (PARTITION BY country) AS avg_age_by_country,  -- Average age within each country

    -- Grouping functions for age
    NTILE(3) OVER (PARTITION BY country ORDER BY age DESC) AS ntile_by_country,  -- Divides users into quartiles based on age within each country
    zodiac_sign,  -- Zodiac sign of the user
    LAG(zodiac_sign, 1) OVER (PARTITION BY country ORDER BY age DESC) AS lag_zodiac_sign,  -- Zodiac sign of the previous user within each country
    LEAD(zodiac_sign, 1) OVER (PARTITION BY country ORDER BY age DESC) AS lead_zodiac_sign,  -- Zodiac sign of the next user within each country
    FIRST_VALUE(zodiac_sign) OVER (PARTITION BY country ORDER BY age DESC) AS first_zodiac_sign,  -- First zodiac sign recorded within each country
    LAST_VALUE(zodiac_sign) OVER (PARTITION BY country ORDER BY age DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_zodiac_sign -- Last zodiac sign recorded within each country, considering the entire partition

FROM {database}.{table}
ORDER BY country ASC
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[41]:
country age row_number count_by_country rank_by_country dense_rank_by_country min_age_by_country max_age_by_country sum_age_by_country avg_age_by_country ntile_by_country zodiac_sign lag_zodiac_sign lead_zodiac_sign first_zodiac_sign last_zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [42]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Apply all operations in a single statement
spark_dataframe_joins.select(
    "country",
    "age",
    F.row_number().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("row_number"),
    F.count("*").over(Window.partitionBy("country")).alias("count_by_country"),
    F.rank().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("rank_by_country"),
    F.dense_rank().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("dense_rank_by_country"),
    F.min("age").over(Window.partitionBy("country")).alias("min_age_by_country"),
    F.max("age").over(Window.partitionBy("country")).alias("max_age_by_country"),
    F.sum("age").over(Window.partitionBy("country")).alias("sum_age_by_country"),
    F.avg("age").over(Window.partitionBy("country")).alias("avg_age_by_country"),
    F.ntile(3).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("ntile_by_country"),
    "zodiac_sign",
    F.lag("zodiac_sign", 1).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("lag_zodiac_sign"),
    F.lead("zodiac_sign", 1).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("lead_zodiac_sign"),
    F.first("zodiac_sign").over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("first_zodiac_sign"),
    F.last("zodiac_sign").over(
        Window.partitionBy("country")
        .orderBy(F.col("age").desc())
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    ).alias("last_zodiac_sign")
).orderBy("country", "row_number").toPandas()
Out[42]:
country age row_number count_by_country rank_by_country dense_rank_by_country min_age_by_country max_age_by_country sum_age_by_country avg_age_by_country ntile_by_country zodiac_sign lag_zodiac_sign lead_zodiac_sign first_zodiac_sign last_zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CONCATENATE VALUES AND CREATE ADDITIONAL COLUMNS

Create Queries Using Spark SQL¶

In [43]:
database = "delta_spark_database"
table = "delta_dataframe_all"

query = f"""
WITH NumberedRows AS (
    SELECT
        ROW_NUMBER() OVER (PARTITION BY country ORDER BY id) AS row_number,  -- Assigns consecutive numbers for each country
        (id + age + related_id) AS SUM_NUMBERS,                             -- Sums the numeric columns row by row
        CONCAT(name, ' ', zodiac_sign, ' ', country) AS CONCATENATE_STRINGS,  -- Concatenates the strings with spaces
        values
    FROM {database}.{table}
)
SELECT * FROM NumberedRows;
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[43]:
row_number SUM_NUMBERS CONCATENATE_STRINGS values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [44]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create processed DataFrame
spark_dataframe_joins \
    .withColumn("row_number", F.row_number().over(Window.partitionBy(F.col("country")).orderBy(F.lit(1)))) \
    .select(
        "row_number",
        (F.col("id") + F.col("age") + F.col("related_id")).alias("SUM_NUMBERS"),
        F.concat_ws(" ", F.col("name"), F.col("zodiac_sign"), F.col("country")).alias("CONCATENATE_STRINGS"),
        F.col("values")
    ) \
    .toPandas()
Out[44]:
row_number SUM_NUMBERS CONCATENATE_STRINGS values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

SEPARATE THE LIST VALUES FOR EACH INDIVIDUAL RECORD¶

Create Queries Using Spark SQL¶

In [45]:
database = "delta_spark_database"
table = "delta_dataframe_all"

query = f"""
WITH NumberedRows AS (
    SELECT
        ROW_NUMBER() OVER (PARTITION BY country ORDER BY id) AS row_number,  -- Assigns consecutive numbers for each country
        (id + age + related_id) AS SUM_NUMBERS,                             -- Sums the numeric columns row by row
        CONCAT(name, ' ', zodiac_sign, ' ', country) AS CONCATENATE_STRINGS,  -- Concatenates the strings with spaces
        explode(values) AS value                                              -- Expands the array elements
    FROM {database}.{table}
)
SELECT * FROM NumberedRows;
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[45]:
row_number SUM_NUMBERS CONCATENATE_STRINGS value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [46]:
from pyspark.sql.functions import explode

# Create processed DataFrame
spark_dataframe_joins \
    .withColumn("row_number", F.row_number().over(Window.partitionBy(col("country")).orderBy(F.lit(1)))) \
    .select(
        "row_number",  # Row number for each country partition
        (F.col("id") + F.col("age") + F.col("related_id")).alias("SUM_NUMBERS"),  # Sum of numeric columns for each row
        F.concat_ws(" ", F.col("name"), F.col("zodiac_sign"), F.col("country")).alias("CONCATENATE_STRINGS"),  # Concatenate strings with space
        explode(F.col("values")).alias("value")  # Explode array elements into separate rows
    ) \
    .toPandas()  # Convert the result to Pandas DataFrame
Out[46]:
row_number SUM_NUMBERS CONCATENATE_STRINGS value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

OPERACIONES DE CONJUNTOS

UNION

Create Queries Using Spark SQL¶

In [47]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT * 
FROM {database}.{table1}

UNION

SELECT * 
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[47]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [48]:
spark_dataframe_uno.union(spark_dataframe_dos).toPandas()
Out[48]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

UNION ALL

Create Queries Using Spark SQL¶

In [49]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT * 
FROM {database}.{table1}

UNION ALL

SELECT * 
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[49]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [50]:
spark_dataframe_uno.unionByName(spark_dataframe_dos).toPandas()
Out[50]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

INTERCEPT

Create Queries Using Spark SQL¶

In [51]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1}

INTERSECT

SELECT *
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[51]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [52]:
spark_dataframe_uno.intersect(spark_dataframe_dos).toPandas()
Out[52]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

EXCEPT

Create Queries Using Spark SQL¶

In [53]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1}

EXCEPT

SELECT *
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[53]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [54]:
spark_dataframe_uno.exceptAll(spark_dataframe_dos).toPandas()
Out[54]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [55]:
spark_dataframe_uno.subtract(spark_dataframe_dos).toPandas()
Out[55]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

RENAME DATAFRAMES COLUMNS FOR EXAMPLES

SELF JOIN

Create Queries Using Spark SQL¶

In [56]:
database = "delta_spark_database"
table = "delta_dataframe_uno"

query = f"""
SELECT *
FROM {database}.{table} AS TABLE_UNO_1
INNER JOIN {database}.{table} AS TABLE_UNO_2
    ON TABLE_UNO_1.id = TABLE_UNO_2.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[56]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [57]:
from pyspark.sql import functions as F

# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

# Perform the self join
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "inner"
    ) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()  # Convert the result to Pandas DataFrame
Out[57]:
spark_dataframe_uno.id spark_dataframe_uno.name spark_dataframe_uno.age spark_dataframe_uno.related_id spark_dataframe_uno.zodiac_sign spark_dataframe_uno.country spark_dataframe_uno.values spark_dataframe_dos.id spark_dataframe_dos.name spark_dataframe_dos.age spark_dataframe_dos.related_id spark_dataframe_dos.zodiac_sign spark_dataframe_dos.country spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

INNER JOIN

Create Queries Using Spark SQL¶

In [58]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
INNER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[58]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [59]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno").join(
    spark_dataframe_dos.alias("spark_dataframe_dos"),
    col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
    "inner"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas()  # Convert the result to Pandas DataFrame
Out[59]:
spark_dataframe_uno.id spark_dataframe_uno.name spark_dataframe_uno.age spark_dataframe_uno.related_id spark_dataframe_uno.zodiac_sign spark_dataframe_uno.country spark_dataframe_uno.values spark_dataframe_dos.id spark_dataframe_dos.name spark_dataframe_dos.age spark_dataframe_dos.related_id spark_dataframe_dos.zodiac_sign spark_dataframe_dos.country spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

LEFT JOIN

Create Queries Using Spark SQL¶

In [60]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
LEFT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[60]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [61]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "left"
    ) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()  # Convert the result to a Pandas DataFrame
Out[61]:
spark_dataframe_uno.id spark_dataframe_uno.name spark_dataframe_uno.age spark_dataframe_uno.related_id spark_dataframe_uno.zodiac_sign spark_dataframe_uno.country spark_dataframe_uno.values spark_dataframe_dos.id spark_dataframe_dos.name spark_dataframe_dos.age spark_dataframe_dos.related_id spark_dataframe_dos.zodiac_sign spark_dataframe_dos.country spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

ANTI LEFT JOIN

Create Queries Using Spark SQL¶

In [62]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
LEFT ANTI JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[62]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [63]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT TABLE_UNO.*
FROM {database}.{table1} AS TABLE_UNO
LEFT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_DOS.related_id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[63]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [64]:
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "left_anti"
    ) \
    .toPandas()
Out[64]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

RIGHT JOIN

Create Queries Using Spark SQL¶

In [65]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
RIGHT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[65]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [66]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "right"
    ) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()  # Convert the result to a Pandas DataFrame
Out[66]:
spark_dataframe_uno.id spark_dataframe_uno.name spark_dataframe_uno.age spark_dataframe_uno.related_id spark_dataframe_uno.zodiac_sign spark_dataframe_uno.country spark_dataframe_uno.values spark_dataframe_dos.id spark_dataframe_dos.name spark_dataframe_dos.age spark_dataframe_dos.related_id spark_dataframe_dos.zodiac_sign spark_dataframe_dos.country spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

ANTI RIGHT JOIN

Create Queries Using Spark SQL¶

In [67]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT TABLE_DOS.*
FROM {database}.{table1} AS TABLE_UNO
RIGHT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_UNO.id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[67]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [68]:
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "right"
    ) \
    .filter(F.col("spark_dataframe_uno.id").isNull()) \
    .select([F.col(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]) \
    .toPandas()
Out[68]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

INVIRTIENDO LAS TABLAS Y USANDO EL ANTILEFT JOIN¶

In [69]:
spark_dataframe_dos.alias("spark_dataframe_dos") \
    .join(
        spark_dataframe_uno.alias("spark_dataframe_uno"),
        F.col("spark_dataframe_dos.id") == F.col("spark_dataframe_uno.related_id"),
        "left_anti"
    ) \
    .toPandas()
Out[69]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

OUTER JOIN

Create Queries Using Spark SQL¶

In [70]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
FULL OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[70]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [71]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

# Perform the join with aliases and select all columns automatically
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "outer"
    ) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()  # Convert the result to a Pandas DataFrame
Out[71]:
spark_dataframe_uno.id spark_dataframe_uno.name spark_dataframe_uno.age spark_dataframe_uno.related_id spark_dataframe_uno.zodiac_sign spark_dataframe_uno.country spark_dataframe_uno.values spark_dataframe_dos.id spark_dataframe_dos.name spark_dataframe_dos.age spark_dataframe_dos.related_id spark_dataframe_dos.zodiac_sign spark_dataframe_dos.country spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

ANTI OUTER JOIN

Create Queries Using Spark SQL¶

In [72]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT 
    TABLE_UNO.*, 
    TABLE_DOS.* 
FROM {database}.{table1} AS TABLE_UNO
FULL OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_UNO.id IS NULL
    OR TABLE_DOS.related_id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[72]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [73]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),  # Alias for spark_dataframe_dos
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),  # Join condition
        "outer"  # Join type: OUTER JOIN
    ) \
    .filter(
        (
            (F.col("spark_dataframe_uno.id").isNotNull() & F.col("spark_dataframe_dos.related_id").isNull())  # Condition when spark_dataframe_uno.id is not null and spark_dataframe_dos.related_id is null
            | (F.col("spark_dataframe_uno.id").isNull() & F.col("spark_dataframe_dos.related_id").isNotNull())  # Condition when spark_dataframe_uno.id is null and spark_dataframe_dos.related_id is not null
        )
    ) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()
Out[73]:
spark_dataframe_uno.id spark_dataframe_uno.name spark_dataframe_uno.age spark_dataframe_uno.related_id spark_dataframe_uno.zodiac_sign spark_dataframe_uno.country spark_dataframe_uno.values spark_dataframe_dos.id spark_dataframe_dos.name spark_dataframe_dos.age spark_dataframe_dos.related_id spark_dataframe_dos.zodiac_sign spark_dataframe_dos.country spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CROSS JOIN

Create Queries Using Spark SQL¶

In [74]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

# RESTORE TABLE {database}.{table1} TO VERSION AS OF {version};
query = f"""
SELECT * FROM {database}.{table1}
CROSS JOIN
{database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[74]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [75]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
CROSS JOIN {database}.{table2} AS TABLE_DOS
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[75]:
id name age related_id zodiac_sign country values id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Create Queries Using Spark DataFrame¶

In [76]:
from pyspark.sql import functions as F

# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

# Perform the CROSS JOIN with aliases and automatically select all columns
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .crossJoin(spark_dataframe_dos.alias("spark_dataframe_dos")) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()  # Convert the result to a Pandas DataFrame
Out[76]:
spark_dataframe_uno.id spark_dataframe_uno.name spark_dataframe_uno.age spark_dataframe_uno.related_id spark_dataframe_uno.zodiac_sign spark_dataframe_uno.country spark_dataframe_uno.values spark_dataframe_dos.id spark_dataframe_dos.name spark_dataframe_dos.age spark_dataframe_dos.related_id spark_dataframe_dos.zodiac_sign spark_dataframe_dos.country spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

UDFs (User Defined Functions)

CREATE A PYTHON FUNCION AND REGISTER LIKE UDF FUNCTION¶

Section Content
What are UDFs? UDFs (User Defined Functions) are custom functions created by developers to perform specific operations in distributed environments like Spark, extending its capabilities when native functions are not sufficient.
Advantages - Flexibility: Enables custom logic.
- Compatibility: Integrates seamlessly with DataFrames and Datasets.
- Reusability: Can be reused across multiple projects.
Disadvantages - Performance: Slower due to serialization and lack of optimization.
- Debugging: Harder to maintain and troubleshoot.
Types of UDFs 1. Scala UDFs: More efficient, executed directly in the JVM.
2. Python UDFs: Less efficient, suitable for complex logic.
3. Pandas UDFs: Operate on vectors, better for intensive computations.
Best Practices 1. Avoid UDFs when possible: Prefer native functions.
2. Use Pandas UDFs if necessary: For vectorized processing in PySpark.
3. Define data types: Ensure clarity and prevent errors.
4. Minimize logic in UDFs: Keep it simple.
5. Testing and monitoring: Validate with real data.
Alternatives to UDFs 1. Native Spark functions: Use methods like filter, groupBy, and agg.
2. Column Expressions: Perform transformations using expressions like col and expr.
Conclusion UDFs are useful for customizing logic in Spark but should be used sparingly to prioritize performance and scalability. Always consider native or more efficient alternatives before resorting to UDFs.
Register UDF Function for DataFrames Step 1: Define a custom function in Python or Scala.
Step 2: Register the function as a UDF using udf and specify the return type.
Step 3: Apply the UDF to DataFrame columns using methods like withColumn.

Python Example:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def divide_by_1000(value):
    return value / 1000 if value is not None else None

divide_by_1000_udf = udf(divide_by_1000, DoubleType())
df_with_converted = df.withColumn("converted_value", divide_by_1000_udf(df["value"]))
df_with_converted.show()


Scala Example:

import org.apache.spark.sql.functions.udf
val divideBy1000 = (value: java.lang.Double) => {
if (value != null) value / 1000 else null
}
val divideBy1000UDF = udf(divideBy1000)
val dfWithConverted = df.withColumn("converted_value", divideBy1000UDF($"value"))
dfWithConverted.show()

SQL UDF FUNCTIONS¶

In [77]:
from pyspark.sql.types import (
    DoubleType,
    FloatType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

# List all the UDF functions registered in the Spark catalog
def list_sql_udf_functions(spark_session):
    # List all registered functions
    functions = spark_session.catalog.listFunctions()

    # Display the names of the functions that are UDFs
    udf_functions = []
    for func in functions:
        if func.className and "UDFRegistration" in func.className:
            udf_functions.append(func.name)
    
    return udf_functions

# Define the function that divides the input value by 1000
def divide_by_1000_spark_sql(value):
    return value / 1000 if value is not None else None

# Define the function that multiplies the input value by 10
def multiply_by_10_spark_sql(value):
    return value * 10 if value is not None else None
    
# Define the function that converts text to uppercase
def to_uppercase_spark_sql(value):
    return value.upper() if value is not None else None

# Define the function that replaces spaces with "&"
def replace_spaces_with_ampersand_spark_sql(value):
    return value.replace(" ", "&") if value is not None else None

# Define the function that replaces lowercase vowels with uppercase vowels
def replace_lowercase_vowels_with_uppercase_spark_sql(value):
    if value is None:
        return None
    # Map lowercase vowels to uppercase in a single pass
    vowels = {'a': 'A', 'e': 'E', 'i': 'I', 'o': 'O', 'u': 'U'}
    return ''.join(vowels.get(c, c) for c in value)

# Define the function that replaces vowels and some characters with similar-looking numbers
def replace_chars_with_similar_numbers_spark_sql(value):
    if value is None:
        return None
    # Map characters to similar-looking numbers for both uppercase and lowercase
    chars_to_numbers = {
        'a': '4', 'e': '3', 'i': '1', 'o': '0', 's': '5', 't': '7', 'b': '8', 'g': '9', 
        'l': '1', 'z': '2', 
        'A': '4', 'E': '3', 'I': '1', 'O': '0', 'S': '5', 'T': '7', 'B': '8', 'G': '9', 
        'L': '1', 'Z': '2'
    }
    return ''.join(chars_to_numbers.get(c, c) for c in value)

Register the UDF function into the Spark SQL catalog Just for use in SQL queries¶

In [78]:
# Register the function as a UDF in Spark
spark_session.udf.register(name="divide_column_value_by_1000", f=divide_by_1000_spark_sql, returnType=FloatType())
spark_session.udf.register(name="multiply_values_10_times", f=multiply_by_10_spark_sql, returnType=IntegerType())
spark_session.udf.register(name="convert_string_to_uppercase", f=to_uppercase_spark_sql, returnType=StringType())
spark_session.udf.register(name="convert_blank_spaces_with_ampersand", f=replace_spaces_with_ampersand_spark_sql, returnType=StringType())
spark_session.udf.register(name="replace_lowercase_vowels_with_uppercase_vowels", f=replace_lowercase_vowels_with_uppercase_spark_sql, returnType=StringType())
spark_session.udf.register(name="replace_chars_by_numbers", f=replace_chars_with_similar_numbers_spark_sql, returnType=StringType())
Out[78]:
<function __main__.replace_chars_with_similar_numbers_spark_sql(value)>

List all the SQL UDF functions registered in the Spark catalog¶

In [79]:
# list udf functions
udf_functions = list_sql_udf_functions(spark_session)
print(udf_functions)
['convert_blank_spaces_with_ampersand', 'convert_string_to_uppercase', 'divide_column_value_by_1000', 'multiply_values_10_times', 'replace_chars_by_numbers', 'replace_lowercase_vowels_with_uppercase_vowels']

USE UDF SQL FUNCTIONS¶

In [80]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"

query = f"""
SELECT 
    id, 
    convert_string_to_uppercase(name) AS name, 
    divide_column_value_by_1000(age) AS age, 
    multiply_values_10_times(related_id) AS related_id, 
    replace_lowercase_vowels_with_uppercase_vowels(zodiac_sign) AS zodiac_sign, 
    replace_chars_by_numbers(country) AS country, 
    values
FROM {database}.{table1} AS TABLE_UNO
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
                                                                                
Out[80]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Register UDF Functions in Spark Session Just for DataFrame Operations Using Spark Decorator¶

EXAMPLES


Python¶

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Define the function
def divide_by_1000_spark_dataframe(value):
    return value / 1000 if value is not None else None

# Register the function as a UDF
divide_by_1000_udf = udf(divide_by_1000_spark_dataframe, DoubleType())

# Create a sample DataFrame
data = [(1000,), (2000,), (None,)]
df = spark.createDataFrame(data, ["value"])

# Use the UDF in the DataFrame
df_with_converted = df.withColumn("converted_value", divide_by_1000_udf(df["value"]))
df_with_converted.show()

Scala¶

import org.apache.spark.sql.functions.udf

// Define the function
val divideBy1000 = (value: java.lang.Double) => {
  if (value != null) value / 1000 else null
}

// Register the function as a UDF
val divideBy1000UDF = udf(divideBy1000)

// Create a sample DataFrame
val data = Seq((1000.0), (2000.0), (null))
val df = spark.createDataFrame(data.map(Tuple1(_))).toDF("value")

// Use the UDF in the DataFrame
val dfWithConverted = df.withColumn("converted_value", divideBy1000UDF($"value"))
dfWithConverted.show()

REGISTER UDF FUNCTION FOR DATAFRAMES¶

METHOD 1 - Using Decorator¶

@udf(returnType=FloatType()) 
def divide_by_1000_spark_dataframe(value):
    return value / 1000 if value is not None else None
In [81]:
from pyspark.sql.functions import udf

# Define the function that divides the input value by 1000
@udf(returnType=FloatType()) 
def divide_by_1000_spark_dataframe(value):
    return value / 1000 if value is not None else None

# Define the function that multiplies the input value by 10
@udf(returnType=IntegerType()) 
def multiply_by_10_spark_dataframe(value):
    return value * 10 if value is not None else None
    
# Define the function that converts text to uppercase
@udf(returnType=StringType())
def to_uppercase_spark_dataframe(value):
    return value.upper() if value is not None else None

# Define the function that replaces spaces with "&"
@udf(returnType=StringType())
def replace_spaces_with_ampersand_spark_dataframe(value):
    return value.replace(" ", "&") if value is not None else None

Converting a Python Function into a Spark UDF¶

METHOD 2 - Convert Function Using the udf Method and Store It in a Variable¶

replace_lowercase_vowels_with_uppercase = udf(replace_lowercase_vowels_with_uppercase_spark_dataframe, StringType())¶

In [82]:
# Define the function that replaces lowercase vowels with uppercase vowels
def replace_lowercase_vowels_with_uppercase_spark_dataframe(value):
    if value is None:
        return None
    # Map lowercase vowels to uppercase in a single pass
    vowels = {'a': 'A', 'e': 'E', 'i': 'I', 'o': 'O', 'u': 'U'}
    return ''.join(vowels.get(c, c) for c in value)

# Define the function that replaces vowels and some characters with similar-looking numbers
def replace_chars_with_similar_numbers_spark_dataframe(value):
    if value is None:
        return None
    # Map characters to similar-looking numbers for both uppercase and lowercase
    chars_to_numbers = {
        'a': '4', 'e': '3', 'i': '1', 'o': '0', 's': '5', 't': '7', 'b': '8', 'g': '9', 
        'l': '1', 'z': '2', 
        'A': '4', 'E': '3', 'I': '1', 'O': '0', 'S': '5', 'T': '7', 'B': '8', 'G': '9', 
        'L': '1', 'Z': '2'
    }
    return ''.join(chars_to_numbers.get(c, c) for c in value)

# Register the function as a UDF
replace_lowercase_vowels_with_uppercase = udf(replace_lowercase_vowels_with_uppercase_spark_dataframe, StringType())
replace_chars_with_similar_numbers = udf(replace_chars_with_similar_numbers_spark_dataframe, StringType())

USE THE UDF DATAFRAME FUNCTIONS¶

In [83]:
# Aplicar las UDFs a las columnas específicas correctamente
spark_dataframe_uno \
    .withColumn("age", divide_by_1000_spark_dataframe(col("age"))) \
    .withColumn("related_id", multiply_by_10_spark_dataframe(col("related_id"))) \
    .withColumn("name", to_uppercase_spark_dataframe(col("name"))) \
    .withColumn("zodiac_sign", replace_lowercase_vowels_with_uppercase(col("zodiac_sign"))) \
    .withColumn("country", replace_chars_with_similar_numbers(col("country"))) \
.toPandas()
Out[83]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Perform JOINs Using the Function with Parameters

RENAME DATAFRAMES COLUMNS FOR EXAMPLES¶

In [84]:
spark_dataframe_uno_renamed = spark_dataframe_uno.select(
    spark_dataframe_uno["id"].alias("spark_dataframe_uno_id"),
    spark_dataframe_uno["name"].alias("spark_dataframe_uno_name"),
    spark_dataframe_uno["age"].alias("spark_dataframe_uno_age"),
    spark_dataframe_uno["related_id"].alias("spark_dataframe_uno_related_id"),
    spark_dataframe_uno["zodiac_sign"].alias("spark_dataframe_uno_zodiac_sign"),
    spark_dataframe_uno["country"].alias("spark_dataframe_uno_country"),
    spark_dataframe_uno["values"].alias("spark_dataframe_uno_values"),
)

spark_dataframe_dos_renamed = spark_dataframe_dos.select(
    spark_dataframe_dos["id"].alias("spark_dataframe_dos_id"),
    spark_dataframe_dos["name"].alias("spark_dataframe_dos_name"),
    spark_dataframe_dos["age"].alias("spark_dataframe_dos_age"),
    spark_dataframe_dos["related_id"].alias("spark_dataframe_dos_related_id"),
    spark_dataframe_dos["zodiac_sign"].alias("spark_dataframe_dos_zodiac_sign"),
    spark_dataframe_dos["country"].alias("spark_dataframe_dos_country"),
    spark_dataframe_dos["values"].alias("spark_dataframe_dos_values"),
)
In [85]:
spark_dataframe_uno_renamed.toPandas()
Out[85]:
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [86]:
joins = [
    "inner_join",
    "left_join",
    "anti_left_join",
    "right_join",
    "anti_right_join",
    "outer_join",
    "anti_outer_join",
    "cross_join",
    "intersect",
    "except",
    "union",
    "union_all",
]

for join in joins:
    join_results = join_spark_dataframes(
        left_df=spark_dataframe_uno_renamed,
        right_df=spark_dataframe_dos_renamed,
        join_type=join,
        left_column="spark_dataframe_uno_id",
        right_column="spark_dataframe_dos_related_id",
        return_pandas=True,
    )

    print(f"Join -> {join}")
    display(join_results)
    print()

print("Join -> self")
display(join_spark_dataframes(
        left_df=spark_dataframe_uno_renamed,
        right_df=spark_dataframe_dos_renamed,
        join_type='self',
        left_column="spark_dataframe_uno_id",
        right_column="spark_dataframe_uno_related_id",
        return_pandas=True,
    ))
print()
Join -> inner_join
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> left_join
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> anti_left_join
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> right_join
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> anti_right_join
spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> outer_join
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> anti_outer_join
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> cross_join
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> intersect
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> except
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> union
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> union_all
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_dos_id spark_dataframe_dos_name spark_dataframe_dos_age spark_dataframe_dos_related_id spark_dataframe_dos_zodiac_sign spark_dataframe_dos_country spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Join -> self
spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values spark_dataframe_uno_id spark_dataframe_uno_name spark_dataframe_uno_age spark_dataframe_uno_related_id spark_dataframe_uno_zodiac_sign spark_dataframe_uno_country spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

In [87]:
list_all_databases_and_tables(spark_session)
The following databases and tables are present in the Spark Catalog.

Database: default, Table: delta_sql

Database: delta_spark_database, Table: delta_dataframe_1

Database: delta_spark_database, Table: delta_dataframe_2

Database: delta_spark_database, Table: delta_dataframe_all

Database: delta_spark_database, Table: delta_dataframe_dos

Database: delta_spark_database, Table: delta_dataframe_uno

Out[87]:
{'default': ['delta_sql'],
 'delta_spark_database': ['delta_dataframe_1',
  'delta_dataframe_2',
  'delta_dataframe_all',
  'delta_dataframe_dos',
  'delta_dataframe_uno']}

CRUD Operations in Spark

In [88]:
file_path_delta_1 = 'warehouse-spark/spark_catalog/database/delta_spark_database.db/delta_dataframe_uno'

INSERT¶

In [89]:
# Call the function to insert data into the Delta table within the 'delta_spark_database' database
insert_into_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", spark_dataframe_dos)

write_into_delta_lake(spark_session, file_path_delta_1, spark_dataframe_dos)
Records inserted into Delta table: delta_spark_database.delta_dataframe_uno
No new rows to append.

UPDATE¶

In [90]:
update_in_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", 
                      "zodiac_sign = 'Capricorn'", {"zodiac_sign": "Capricornio"})

update_in_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno",
                      "zodiac_sign = 'Cancer'", {"zodiac_sign": "Capricornio"})

update_from_delta_lake(spark_session, file_path_delta_1, "age >= 15", {"age": "age * 1.5", "name": "UPPER(name)"})
Records matching condition 'zodiac_sign = 'Capricorn'' updated in Delta table: delta_spark_database.delta_dataframe_uno
Records matching condition 'zodiac_sign = 'Cancer'' updated in Delta table: delta_spark_database.delta_dataframe_uno
Updated records with condition 'age >= 15'.
Out[90]:
format id name description location createdAt lastModified partitionColumns clusteringColumns numFiles sizeInBytes properties minReaderVersion minWriterVersion tableFeatures
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

DELETE¶

In [91]:
delete_condition = "id > 20"

# Call the function
delete_from_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", delete_condition)

delete_from_delta_lake(spark_session, file_path_delta_1, 'age >= 40')
Records matching condition 'id > 20' deleted from Delta table: delta_spark_database.delta_dataframe_uno
No records match the condition 'age >= 40'.
Out[91]:
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

RESTORE VERSION¶

In [92]:
restore_delta_lake_to_version(spark_session, file_path_delta_1, 1)
24/12/27 20:16:41 WARN DAGScheduler: Broadcasting large task binary with size 1078.9 KiB
Restored to version 1.
Out[92]:
format id name description location createdAt lastModified partitionColumns clusteringColumns numFiles sizeInBytes properties minReaderVersion minWriterVersion tableFeatures
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

MERGE¶

In [93]:
# Sample data
data0 = [
    {
        "id": 23, "name": "Sun", "age": 13, "related_id": 7, "zodiac_sign": "Leo", "country": "Brazil", "values": [31, 13, 3, 7, 11]
    },
    {
        "id": 24, "name": "Moon", "age": 31, "related_id": 7, "zodiac_sign": "Capricorn", "country": "Bolivia", "values": [12, 13, 27, 31]
    },
    {
        "id": 25, "name": "Earth", "age": 25, "related_id": 7, "zodiac_sign": "Taurus", "country": "USA", "values": [15, 28, 7]
    },
    {
        "id": 26, "name": "Mars", "age": 35, "related_id": 8, "zodiac_sign": "Aries", "country": "Argentina", "values": [18, 21, 4]
    }, 
]

data1 = [
    {
        "id": 27, "name": "Venus", "age": 29, "related_id": 5, "zodiac_sign": "Libra", "country": "Argentina", "values": [19, 25, 8, 11]
    },
    {
        "id": 28, "name": "Jupiter", "age": 45, "related_id": 6, "zodiac_sign": "Sagittarius", "country": "Brazil", "values": [23, 30, 12]
    },
    {
        "id": 29, "name": "Saturn", "age": 60, "related_id": 3, "zodiac_sign": "Aquarius", "country": "Peru", "values": [17, 35, 22, 40]
    },
    {
        "id": 30, "name": "Uranus", "age": 28, "related_id": 9, "zodiac_sign": "Scorpio", "country": "Uruguay", "values": [20, 26, 14]
    },
    {
        "id": 31, "name": "Neptune", "age": 50, "related_id": 4, "zodiac_sign": "Pisces", "country": "Boliva", "values": [29, 19, 33]
    }
]

data2 = [
    {
        "id": 32, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5]
    },
    {
        "id": 33, "name": "Ceres", "age": 38, "related_id": 10, "zodiac_sign": "Virgo", "country": "Brazil", "values": [21, 18, 6]
    },
    {
        "id": 34, "name": "Eris", "age": 22, "related_id": 11, "zodiac_sign": "Gemini", "country": "Peru", "values": [15, 25, 12]
    },
    {
        "id": 35, "name": "Haumea", "age": 19, "related_id": 1, "zodiac_sign": "Aries", "country": "Mexico", "values": [10, 14, 7, 9]
    },
    {
        "id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11]
    }
]

# Define schema for data (this step is optional, but ensures correct data types)
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("related_id", IntegerType(), True),
    StructField("zodiac_sign", StringType(), True),
    StructField("country", StringType(), True),
    StructField("values", ArrayType(IntegerType()), True),
])

# Create a DataFrame from the sample data

for data in [data0, data1, data2]:
    new_data_df = spark_session.createDataFrame(data, schema)
    display(merge_from_delta_lake(spark_session, file_path_delta_1, new_data_df, "id"))
Merge operation completed successfully.
None
Merge operation completed successfully.
None
Merge operation completed successfully.
None

INSERT DATA AVOING DUPLICATES¶

In [94]:
data2 = [
    {
        "id": 32, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5]
    },
    {
        "id": 33, "name": "Ceres", "age": 38, "related_id": 10, "zodiac_sign": "Virgo", "country": "Brazil", "values": [21, 18, 6]
    },
    {
        "id": 99, "name": "Eris", "age": 22, "related_id": 11, "zodiac_sign": "Gemini", "country": "Peru", "values": [15, 25, 12]
    },
    {
        "id": 35, "name": "Haumea", "age": 19, "related_id": 1, "zodiac_sign": "Aries", "country": "Mexico", "values": [10, 14, 7, 9]
    },
    {
        "id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11]
    }
]


for data in [data0, data1, data2]:
    new_data_df = spark_session.createDataFrame(data, schema)
    display(write_into_delta_lake(spark_session, file_path_delta_1, new_data_df))
No new rows to append.
None
No new rows to append.
None
Added new data without duplicates.
None

EVOLUTION SCHEMA¶

In [95]:
evolution_schema = [
    {
        "id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11], "color":"Blue"
    },
    {
        "id": 1227, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5], "fruit": "Guama"
    }
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("related_id", IntegerType(), True),
    StructField("zodiac_sign", StringType(), True),
    StructField("country", StringType(), True),
    StructField("values", ArrayType(IntegerType()), True),
    StructField("color", StringType(), True),
    StructField("fruit", StringType(), True),
])

for data in [evolution_schema]:
    new_data_df = spark_session.createDataFrame(data, schema)
    display(write_into_delta_lake(spark_session, file_path_delta_1, new_data_df))


evolution_schema_2 = [
    {
    "id": 7831, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Capricorn", "country": "Peru", "values": [8, 13, 5], "continent": "America"
    },
        {
    "id": 7831, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Leo", "country": "Peru", "values": [8, 13, 5], "continent": "America", "sport":"Tennis" 
    }
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("related_id", IntegerType(), True),
    StructField("zodiac_sign", StringType(), True),
    StructField("country", StringType(), True),
    StructField("values", ArrayType(IntegerType()), True),
    StructField("continent", StringType(), True),
])

for data in [evolution_schema_2]:
    new_data_df = spark_session.createDataFrame(data, schema)
    merge_from_delta_lake(spark_session, file_path_delta_1, new_data_df, "id")
Added new data without duplicates.
None
Merge operation completed successfully.

Review the version history to explore Delta Lake's time travel functionality.¶

In [96]:
historic_version = show_historic_version_from_delta_file(spark_session, file_path_delta_1)
historic_version
Out[96]:
version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CONTENT CHANGES BY VERSION Method 1 - History Version¶

In [97]:
total_versions = max(historic_version.version)
total_operations = historic_version.operation.to_list()
total_operations.reverse()

for version in range(total_versions + 1):
    print(f"Version : {version}, Operation : {total_operations[version]}, Content")
    display(show_historic_version_from_delta_file(spark_session, file_path_delta_1, version, None, 'id'))
    print()
Version : 0, Operation : CREATE OR REPLACE TABLE AS SELECT, Content
id name age related_id zodiac_sign country values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : SET TBLPROPERTIES, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : WRITE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : OPTIMIZE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 4, Operation : UPDATE, Content
age country id name related_id values zodiac_sign ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 5, Operation : UPDATE, Content
age country id name related_id values zodiac_sign ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 6, Operation : UPDATE, Content
age country id name related_id values zodiac_sign ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 7, Operation : DELETE, Content
age country id name related_id values zodiac_sign ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 8, Operation : RESTORE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 9, Operation : MERGE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 10, Operation : OPTIMIZE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 11, Operation : MERGE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 12, Operation : MERGE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 13, Operation : OPTIMIZE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 14, Operation : WRITE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 15, Operation : OPTIMIZE, Content
age country id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 16, Operation : WRITE, Content
age color country fruit id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 17, Operation : OPTIMIZE, Content
age color country fruit id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 18, Operation : MERGE, Content
age color continent country fruit id name related_id values zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

CONTENT CHANGES BY VERSION Method 2 - Change Data Feed Version Validation and Control¶

In [98]:
total_versions = max(historic_version.version)
total_operations = historic_version.operation.to_list()
total_operations.reverse()

for version in range(total_versions + 1):
    print(f"Version : {version}, Operation : {total_operations[version]}, Content")
    display(read_delta_table_with_change_data_control(spark_session, file_path_delta_1, version, version))
    print()
Version : 0, Operation : CREATE OR REPLACE TABLE AS SELECT, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : SET TBLPROPERTIES, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : WRITE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : OPTIMIZE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 4, Operation : UPDATE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 5, Operation : UPDATE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 6, Operation : UPDATE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 7, Operation : DELETE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 8, Operation : RESTORE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 9, Operation : MERGE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 10, Operation : OPTIMIZE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 11, Operation : MERGE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 12, Operation : MERGE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 13, Operation : OPTIMIZE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 14, Operation : WRITE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 15, Operation : OPTIMIZE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 16, Operation : WRITE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 17, Operation : OPTIMIZE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 18, Operation : MERGE, Content
id name age related_id zodiac_sign country values color fruit continent _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Review the version history FROM FILE¶

DESCARGAR ARCHIVOS¶

In [99]:
import pandas as pd
import os
import gc

# Path for the directory
file_path = "documents/"
file_text_1 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/IngenieriaDeDatos.txt"
file_text_2 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/DeltaLake.txt"
file_text_3 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/MLOps.txt"

# Create the directory if it doesn't exist
os.makedirs(file_path, exist_ok=True)

for file_url in [file_text_1, file_text_2, file_text_3]:
    # Read the text file from the URL
    pandas_df = pd.read_csv(file_url, encoding="utf-8")

    # Get the file name
    file_name = file_url.split("/")[-1]
    print(file_name)

    # Path to save the file
    save_path = f"{file_path}/{file_name}"

    # Save the text file without modifications
    pandas_df.to_csv(save_path, index=False, encoding="utf-8")

    print(f"File downloaded: {file_name} and saved in the directory -> {save_path}")

# Release memory by deleting the DataFrame
del pandas_df
gc.collect()  # Force garbage collection

print("Pandas DataFrame removed from memory.")
IngenieriaDeDatos.txt
File downloaded: IngenieriaDeDatos.txt and saved in the directory -> documents//IngenieriaDeDatos.txt
DeltaLake.txt
File downloaded: DeltaLake.txt and saved in the directory -> documents//DeltaLake.txt
MLOps.txt
File downloaded: MLOps.txt and saved in the directory -> documents//MLOps.txt
Pandas DataFrame removed from memory.

PROCESAR ARCHIVOS¶

In [100]:
# Read the text file as a Spark DataFrame
df_text_1 = spark_session.read.text("documents/IngenieriaDeDatos.txt")
df_text_2 = spark_session.read.text("documents/DeltaLake.txt")
df_text_3 = spark_session.read.text("documents/MLOps.txt")

delta_lake_text_path = "warehouse-spark/spark_files/delta_parquet/texto"

# Save the combined DataFrame to a Delta table
for new_data_df in [df_text_1, df_text_2, df_text_3]:
    display(write_into_delta_lake(spark_session, delta_lake_text_path, new_data_df))

# Read the data from the Delta Lake table and convert it to a Pandas DataFrame
spark_session.read.format("delta").load(delta_lake_text_path).toPandas()
warehouse-spark/spark_files/delta_parquet/texto does not contain a Delta table.
Created Delta table with new data.
None
Added new data without duplicates.
None
Added new data without duplicates.
None
Out[100]:
value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Method 1 - History Version¶

In [101]:
historic_version_file = show_historic_version_from_delta_file(spark_session, delta_lake_text_path)
historic_version_file
Out[101]:
version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [102]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(show_historic_version_from_delta_file(spark_session, delta_lake_text_path, version))
    print()
Version : 0, Operation : WRITE, Content
Warning: Column 'None' not found in the DataFrame.
value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : WRITE, Content
Warning: Column 'None' not found in the DataFrame.
value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : OPTIMIZE, Content
Warning: Column 'None' not found in the DataFrame.
value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : WRITE, Content
Warning: Column 'None' not found in the DataFrame.
value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Method 2 - Change Data Feed Version Validation and Control¶

In [103]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(read_delta_table_with_change_data_control(spark_session, delta_lake_text_path, version, version))
    print()
Version : 0, Operation : WRITE, Content
value _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : WRITE, Content
value _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : OPTIMIZE, Content
value _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : WRITE, Content
value _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

DELTA STREAMING¶

In [104]:
import pandas as pd
import numpy as np
from time import sleep
import os

def process_and_save_files(
    csv_streaming_path, 
    base_streaming_directory, 
    metadata_streaming_path, 
    delta_output_streaming_path, 
    file_name='flight_logs', 
    num_files=5, 
    sleep_time=10, 
    raw_url_base='https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/', 
    file_format='csv', 
    multiline_json=False
):
    """
    Processes and saves streaming files in the specified formats (csv, json, or parquet), merging data from two parts
    based on the 'id' or 'flight_id' column, and saves the final merged file.

    Parameters:
    csv_streaming_path (str): Path to save the processed CSV files.
    base_streaming_directory (str): Base directory for file storage.
    metadata_streaming_path (str): Path for metadata storage.
    delta_output_streaming_path (str): Path to save Delta format files.
    file_name (str, optional): Base name for the files. Default is 'flight_logs'.
    num_files (int, optional): Number of files to process. Default is 5.
    sleep_time (int, optional): Time in seconds to wait before processing the next file. Default is 10 seconds.
    raw_url_base (str, optional): Base URL for downloading files. Default is a GitHub URL.
    file_format (str, optional): File format to process ('csv', 'json', 'parquet'). Default is 'csv'.
    multiline_json (bool, optional): Whether to handle JSON files as multiline. Default is False.

    Raises:
    ValueError: If the file format is unsupported.
    """
    # Create directories if they do not exist
    os.makedirs(csv_streaming_path, exist_ok=True)
    os.makedirs(metadata_streaming_path, exist_ok=True)
    os.makedirs(delta_output_streaming_path, exist_ok=True)

    # Process files
    for index in range(1, num_files + 1):
        final_file = f'{csv_streaming_path}/{file_name}_{index}.{file_format}'

        # Download and read the first part of the file in the appropriate format
        file_1 = f'{file_name}_part_1_{index}.{file_format}'
        base_url1 = f'{raw_url_base}{file_1}'

        if file_format == 'csv':
            df1 = pd.read_csv(base_url1)
        elif file_format == 'json':
            if multiline_json:
                df1 = pd.read_json(base_url1, lines=True)  # For multiline JSON
            else:
                df1 = pd.read_json(base_url1)  # For JSON in list
        elif file_format == 'parquet':
            df1 = pd.read_parquet(base_url1)
        else:
            raise ValueError(f"Unsupported file format {file_format}")

        # Download and read the second part of the file in the appropriate format
        file_2 = f'{file_name}_part_2_{index}.{file_format}'
        base_url2 = f'{raw_url_base}{file_2}'

        if file_format == 'csv':
            df2 = pd.read_csv(base_url2)
        elif file_format == 'json':
            if multiline_json:
                df2 = pd.read_json(base_url2, lines=True)  # For multiline JSON
            else:
                df2 = pd.read_json(base_url2)  # For JSON in list
        elif file_format == 'parquet':
            df2 = pd.read_parquet(base_url2)
        else:
            raise ValueError(f"Unsupported file format {file_format}")

        # Merge the data on the 'id' or 'flight_id' column
        df3 = pd.merge(df1, df2, left_on='id', right_on='flight_id', how='inner')

        # Save the final file in the appropriate format
        if file_format == 'csv':
            df3.to_csv(f'{final_file}', index=False)
        elif file_format == 'json':
            df3.to_json(f'{final_file}', orient='records', lines=True)
        elif file_format == 'parquet':
            df3.to_parquet(f'{final_file}')
        else:
            raise ValueError(f"Unsupported file format {file_format}")

        print(f'{final_file} saved Successfully!!')
        
        # Pause for the specified time
        sleep(sleep_time)

DEFINE THE STRUCTURE OF THE DATA TO BE READ, TO ASSIGN THE APPROPRIATE SCHEMA¶

In [105]:
import shutil
from datetime import datetime

from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, lit
from pyspark.sql.types import (
    DoubleType,
    FloatType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

# Define the schema
customSchema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("secure_code", StringType(), True),
        StructField("airline", StringType(), True),
        StructField("departure_city", StringType(), True),
        StructField("departure_date", StringType(), True),
        StructField("arrival_airport", StringType(), True),
        StructField("arrival_city", StringType(), True),
        StructField("arrival_time", StringType(), True),
        StructField("passenger_name", StringType(), True),
        StructField("passenger_gender", StringType(), True),
        StructField("seat_number", StringType(), True),
        StructField("currency", StringType(), True),
        StructField("departure_gate", StringType(), True),
        StructField("flight_status", StringType(), True),
        StructField("co_pilot_name", StringType(), True),
        StructField("aircraft_type", StringType(), True),
        StructField("fuel_consumption", DoubleType(), True),
        StructField("flight_id", IntegerType(), True),
        StructField("flight_number", IntegerType(), True),
        StructField("departure_airport", StringType(), True),
        StructField("departure_country", StringType(), True),
        StructField("departure_time", StringType(), True),
        StructField("arrival_country", StringType(), True),
        StructField("arrival_date", StringType(), True),
        StructField("flight_duration", DoubleType(), True),
        StructField("passenger_age", IntegerType(), True),
        StructField("passenger_nationality", StringType(), True),
        StructField("ticket_price", DoubleType(), True),
        StructField("baggage_weight", DoubleType(), True),
        StructField("arrival_gate", StringType(), True),
        StructField("pilot_name", StringType(), True),
        StructField("cabin_crew_count", IntegerType(), True),
        StructField("aircraft_registration", StringType(), True),
        StructField("flight_distance", DoubleType(), True),
    ]
)

EXECUTE THE DELTA STREAMING VERSION¶

In [106]:
import time
from datetime import datetime
from pyspark.sql.functions import upper

def process_streaming_files(
    spark_session,
    input_path,
    output_path,
    checkpoint_path,
    file_format="csv",
    custom_schema=None,
    header=True,
    output_mode="append",
    idle_timeout=20,
    multiline_json=False,  # For multiline JSON
    capitalized_column="departure_city",  # Parameter for the column to capitalize
    replace_chars_with_numbers_column="airline"  # Parameter for the column to replace characters by numbers
):
    """
    Processes streaming files from the specified input path, handles different file formats (csv, json, parquet),
    and writes the processed data to an output path in Delta format. It also handles checkpoints, manages idle timeout,
    and performs data transformations such as capitalizing specified columns and replacing characters with similar numbers.

    Parameters:
    spark_session (SparkSession): The active Spark session used to run the streaming job.
    input_path (str): The path where the streaming files are located.
    output_path (str): The path where the processed Delta files will be written.
    checkpoint_path (str): The path to store the checkpoint data.
    file_format (str, optional): The format of the input files ('csv', 'json', or 'parquet'). Default is 'csv'.
    custom_schema (StructType, optional): The schema to apply to the incoming data. Default is None.
    header (bool, optional): Whether to include the header in CSV files. Default is True.
    output_mode (str, optional): The output mode for the streaming job. Default is 'append'.
    idle_timeout (int, optional): The timeout in seconds to stop the stream if no new data is processed. Default is 20 seconds.
    multiline_json (bool, optional): Whether to handle JSON files as multiline. Default is False.
    capitalized_column (str, optional): The name of the column whose values should be capitalized. Default is 'departure_city'.
    replace_chars_with_numbers (str, optional): The name of the column whose characters should be replaced with similar-looking numbers. Default is 'airline'.

    Raises:
    ValueError: If the file format is unsupported.
    """
    
    # Set up the stream reader depending on the file format
    stream_reader = spark_session.readStream.format(file_format)
    
    if file_format == "csv":
        stream_reader = stream_reader.option("header", str(header).lower())
    elif file_format == "json":
        if multiline_json:
            stream_reader = stream_reader.option("multiline", "true")  # For multiline JSON
        else:
            stream_reader = stream_reader.option("multiline", "false")  # For JSON by line
    elif file_format == "parquet":
        # No additional options needed for Parquet
        pass
    else:
        raise ValueError(f"Unsupported file format {file_format}")

    # Apply custom schema if provided
    if custom_schema:
        stream_reader = stream_reader.schema(custom_schema)

    # Read the stream from the input path
    input_stream = stream_reader.load(input_path)
    
    # DATA TRANSFORMATIONs
    # Capitalize the specified column if it exists in the DataFrame
    if capitalized_column in input_stream.columns:
        input_stream = input_stream.withColumn(
            capitalized_column, 
            to_uppercase_spark_dataframe(input_stream[capitalized_column])
        )
    
    # Replace characters with similar numbers for another specified column if it exists in the DataFrame
    if replace_chars_with_numbers_column in input_stream.columns:
        input_stream = input_stream.withColumn(
            replace_chars_with_numbers_column, 
            replace_chars_with_similar_numbers(input_stream[replace_chars_with_numbers_column])
        )

    # Write the stream to Delta format with checkpointing
    output_stream = input_stream.writeStream \
        .format("delta") \
        .outputMode(output_mode) \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(processingTime="10 seconds")  # Process every 10 seconds

    # Start the streaming query
    query = output_stream.start(output_path)

    last_processed_time = time.time()  # Initialize the last processed time

    while query.isActive:
        # Get the last progress of the stream
        last_progress = query.lastProgress
        if last_progress:
            processing_time = last_progress.get("processedRowsPerSecond", 0)
            print(f"Last progress: {last_progress}, Processed per second: {processing_time}")

            if processing_time == 0:
                # If no new rows have been processed, check the idle time
                time_since_last_processed = time.time() - last_processed_time
                print(f"Time since last data processed: {time_since_last_processed} seconds.")
                
                if time_since_last_processed > idle_timeout:
                    print(f"No new data for {idle_timeout} seconds. Stopping the stream.")
                    query.stop()
                    break

            else:
                last_processed_time = time.time()  # Update the time when data is processed
        
        # Sleep briefly before checking again
        time.sleep(5)

    print("Stream has stopped.")

DATA PROCESSING IN STREAMING WITH THREADS

In [107]:
import threading

# Function to execute 'process_and_save_files'
def run_process_and_save_files():
    """
    Runs the process_and_save_files function to download, process, and save files in the specified directories.
    """
    process_and_save_files(
        csv_streaming_path="csv_streaming_files/",
        base_streaming_directory="warehouse-spark/delta_spark_streaming",
        metadata_streaming_path="warehouse-spark/delta_spark_streaming/metadata",
        delta_output_streaming_path="warehouse-spark/delta_spark_streaming/proccesed",
        file_name="flight_logs",
        num_files=5,
        sleep_time=10,
        raw_url_base="https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/",
    )

# Function to execute 'process_streaming_files'
def run_process_streaming_files():
    """
    Runs the process_streaming_files function to process streaming data and write it to a Delta table.
    """
    process_streaming_files(
        spark_session=spark_session,
        input_path=csv_streaming_path,
        output_path=delta_output_streaming_path,
        checkpoint_path=metadata_streaming_path,
        file_format="csv",
        custom_schema=customSchema,
        header=True,
        output_mode="append",
        idle_timeout=20,
        capitalized_column="departure_city",
        replace_chars_with_numbers_column="airline",
    )

# Define directories for processing
csv_streaming_path = "csv_streaming_files/"
base_streaming_directory = "warehouse-spark/delta_spark_streaming"
metadata_streaming_path = f"{base_streaming_directory}/metadata"
delta_output_streaming_path = f"{base_streaming_directory}/proccesed"

# Create threads to run both functions in parallel
process_and_save_files_thread = threading.Thread(target=run_process_and_save_files)
process_streaming_files_thread = threading.Thread(target=run_process_streaming_files)

# Start both threads in parallel
process_and_save_files_thread.start()
process_streaming_files_thread.start()

# Wait for both threads to finish before continuing with the next cell
process_and_save_files_thread.join()
process_streaming_files_thread.join()

print("Both processes have finished.")
24/12/27 20:18:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
csv_streaming_files//flight_logs_1.csv saved Successfully!!
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:02.291Z', 'batchId': 0, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 3, 'triggerExecution': 9}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': None, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 5.012174844741821 seconds.
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:10.000Z', 'batchId': 0, 'numInputRows': 1000, 'inputRowsPerSecond': 129.71851083149565, 'processedRowsPerSecond': 803.8585209003215, 'durationMs': {'addBatch': 1122, 'commitOffsets': 18, 'getBatch': 18, 'latestOffset': 21, 'queryPlanning': 35, 'triggerExecution': 1244, 'walCommit': 22}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': {'logOffset': 0}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 129.71851083149565, 'processedRowsPerSecond': 803.8585209003215}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 803.8585209003215
csv_streaming_files//flight_logs_2.csv saved Successfully!!
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:10.000Z', 'batchId': 0, 'numInputRows': 1000, 'inputRowsPerSecond': 129.71851083149565, 'processedRowsPerSecond': 803.8585209003215, 'durationMs': {'addBatch': 1122, 'commitOffsets': 18, 'getBatch': 18, 'latestOffset': 21, 'queryPlanning': 35, 'triggerExecution': 1244, 'walCommit': 22}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': {'logOffset': 0}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 129.71851083149565, 'processedRowsPerSecond': 803.8585209003215}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 803.8585209003215
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:20.001Z', 'batchId': 1, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 2604.1666666666665, 'durationMs': {'addBatch': 322, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 7, 'triggerExecution': 384, 'walCommit': 16}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 0}, 'endOffset': {'logOffset': 1}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 2604.1666666666665}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 2604.1666666666665
csv_streaming_files//flight_logs_3.csv saved Successfully!!
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:20.001Z', 'batchId': 1, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 2604.1666666666665, 'durationMs': {'addBatch': 322, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 7, 'triggerExecution': 384, 'walCommit': 16}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 0}, 'endOffset': {'logOffset': 1}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 2604.1666666666665}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 2604.1666666666665
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:30.000Z', 'batchId': 2, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 1278.772378516624, 'durationMs': {'addBatch': 724, 'commitOffsets': 16, 'getBatch': 4, 'latestOffset': 17, 'queryPlanning': 5, 'triggerExecution': 782, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 1}, 'endOffset': {'logOffset': 2}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 1278.772378516624}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1278.772378516624
csv_streaming_files//flight_logs_4.csv saved Successfully!!
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:30.000Z', 'batchId': 2, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 1278.772378516624, 'durationMs': {'addBatch': 724, 'commitOffsets': 16, 'getBatch': 4, 'latestOffset': 17, 'queryPlanning': 5, 'triggerExecution': 782, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 1}, 'endOffset': {'logOffset': 2}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 1278.772378516624}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1278.772378516624
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:40.001Z', 'batchId': 3, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 1358.695652173913, 'durationMs': {'addBatch': 680, 'commitOffsets': 15, 'getBatch': 4, 'latestOffset': 16, 'queryPlanning': 4, 'triggerExecution': 736, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 2}, 'endOffset': {'logOffset': 3}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 1358.695652173913}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1358.695652173913
csv_streaming_files//flight_logs_5.csv saved Successfully!!
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:40.001Z', 'batchId': 3, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 1358.695652173913, 'durationMs': {'addBatch': 680, 'commitOffsets': 15, 'getBatch': 4, 'latestOffset': 16, 'queryPlanning': 4, 'triggerExecution': 736, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 2}, 'endOffset': {'logOffset': 3}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 99.99000099990002, 'processedRowsPerSecond': 1358.695652173913}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1358.695652173913
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:50.001Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526, 'durationMs': {'addBatch': 729, 'commitOffsets': 14, 'getBatch': 5, 'latestOffset': 19, 'queryPlanning': 5, 'triggerExecution': 787, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1270.6480304955526
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:50.001Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526, 'durationMs': {'addBatch': 729, 'commitOffsets': 14, 'getBatch': 5, 'latestOffset': 19, 'queryPlanning': 5, 'triggerExecution': 787, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1270.6480304955526
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:50.001Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526, 'durationMs': {'addBatch': 729, 'commitOffsets': 14, 'getBatch': 5, 'latestOffset': 19, 'queryPlanning': 5, 'triggerExecution': 787, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1270.6480304955526
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:18:50.001Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526, 'durationMs': {'addBatch': 729, 'commitOffsets': 14, 'getBatch': 5, 'latestOffset': 19, 'queryPlanning': 5, 'triggerExecution': 787, 'walCommit': 14}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1270.6480304955526}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1270.6480304955526
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:19:10.000Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 3, 'triggerExecution': 4}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 5.001832962036133 seconds.
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:19:10.000Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 3, 'triggerExecution': 4}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 10.003771305084229 seconds.
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:19:10.000Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 3, 'triggerExecution': 4}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 15.0054292678833 seconds.
INFO:py4j.clientserver:Closing down clientserver connection
Last progress: {'id': 'ec94b90f-41a7-40d8-9554-73ad9a1a566f', 'runId': '0e47383f-3a33-4315-9b61-abdff0474301', 'name': None, 'timestamp': '2024-12-27T20:19:10.000Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 3, 'triggerExecution': 4}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 20.00701928138733 seconds.
No new data for 20 seconds. Stopping the stream.
Stream has stopped.
Both processes have finished.

Method 1 - History Version¶

In [108]:
historic_version_file = show_historic_version_from_delta_file(spark_session, delta_output_streaming_path)
historic_version_file
Out[108]:
version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [109]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(show_historic_version_from_delta_file(spark_session, delta_output_streaming_path, version))
    print()
Version : 0, Operation : STREAMING UPDATE, Content
Warning: Column 'None' not found in the DataFrame.
id secure_code airline departure_city departure_date arrival_airport arrival_city arrival_time passenger_name passenger_gender seat_number currency departure_gate flight_status co_pilot_name aircraft_type fuel_consumption flight_id flight_number departure_airport departure_country departure_time arrival_country arrival_date flight_duration passenger_age passenger_nationality ticket_price baggage_weight arrival_gate pilot_name cabin_crew_count aircraft_registration flight_distance
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : STREAMING UPDATE, Content
Warning: Column 'None' not found in the DataFrame.
aircraft_registration aircraft_type airline arrival_airport arrival_city arrival_country arrival_date arrival_gate arrival_time baggage_weight cabin_crew_count co_pilot_name currency departure_airport departure_city departure_country departure_date departure_gate departure_time flight_distance flight_duration flight_id flight_number flight_status fuel_consumption id passenger_age passenger_gender passenger_name passenger_nationality pilot_name seat_number secure_code ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : STREAMING UPDATE, Content
Warning: Column 'None' not found in the DataFrame.
aircraft_registration aircraft_type airline arrival_airport arrival_city arrival_country arrival_date arrival_gate arrival_time baggage_weight cabin_crew_count co_pilot_name currency departure_airport departure_city departure_country departure_date departure_gate departure_time flight_distance flight_duration flight_id flight_number flight_status fuel_consumption id passenger_age passenger_gender passenger_name passenger_nationality pilot_name seat_number secure_code ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : STREAMING UPDATE, Content
Warning: Column 'None' not found in the DataFrame.
aircraft_registration aircraft_type airline arrival_airport arrival_city arrival_country arrival_date arrival_gate arrival_time baggage_weight cabin_crew_count co_pilot_name currency departure_airport departure_city departure_country departure_date departure_gate departure_time flight_distance flight_duration flight_id flight_number flight_status fuel_consumption id passenger_age passenger_gender passenger_name passenger_nationality pilot_name seat_number secure_code ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 4, Operation : STREAMING UPDATE, Content
Warning: Column 'None' not found in the DataFrame.
aircraft_registration aircraft_type airline arrival_airport arrival_city arrival_country arrival_date arrival_gate arrival_time baggage_weight cabin_crew_count co_pilot_name currency departure_airport departure_city departure_country departure_date departure_gate departure_time flight_distance flight_duration flight_id flight_number flight_status fuel_consumption id passenger_age passenger_gender passenger_name passenger_nationality pilot_name seat_number secure_code ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Method 2 - Change Data Feed Version Validation and Control¶

In [110]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(read_delta_table_with_change_data_control(spark_session, delta_output_streaming_path, version, version))
    print()
Version : 0, Operation : STREAMING UPDATE, Content
id secure_code airline departure_city departure_date arrival_airport arrival_city arrival_time passenger_name passenger_gender seat_number currency departure_gate flight_status co_pilot_name aircraft_type fuel_consumption flight_id flight_number departure_airport departure_country departure_time arrival_country arrival_date flight_duration passenger_age passenger_nationality ticket_price baggage_weight arrival_gate pilot_name cabin_crew_count aircraft_registration flight_distance _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : STREAMING UPDATE, Content
id secure_code airline departure_city departure_date arrival_airport arrival_city arrival_time passenger_name passenger_gender seat_number currency departure_gate flight_status co_pilot_name aircraft_type fuel_consumption flight_id flight_number departure_airport departure_country departure_time arrival_country arrival_date flight_duration passenger_age passenger_nationality ticket_price baggage_weight arrival_gate pilot_name cabin_crew_count aircraft_registration flight_distance _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : STREAMING UPDATE, Content
id secure_code airline departure_city departure_date arrival_airport arrival_city arrival_time passenger_name passenger_gender seat_number currency departure_gate flight_status co_pilot_name aircraft_type fuel_consumption flight_id flight_number departure_airport departure_country departure_time arrival_country arrival_date flight_duration passenger_age passenger_nationality ticket_price baggage_weight arrival_gate pilot_name cabin_crew_count aircraft_registration flight_distance _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : STREAMING UPDATE, Content
id secure_code airline departure_city departure_date arrival_airport arrival_city arrival_time passenger_name passenger_gender seat_number currency departure_gate flight_status co_pilot_name aircraft_type fuel_consumption flight_id flight_number departure_airport departure_country departure_time arrival_country arrival_date flight_duration passenger_age passenger_nationality ticket_price baggage_weight arrival_gate pilot_name cabin_crew_count aircraft_registration flight_distance _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 4, Operation : STREAMING UPDATE, Content
id secure_code airline departure_city departure_date arrival_airport arrival_city arrival_time passenger_name passenger_gender seat_number currency departure_gate flight_status co_pilot_name aircraft_type fuel_consumption flight_id flight_number departure_airport departure_country departure_time arrival_country arrival_date flight_duration passenger_age passenger_nationality ticket_price baggage_weight arrival_gate pilot_name cabin_crew_count aircraft_registration flight_distance _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

SPARK MLLIB + SCIKIT LEARN¶

LIBRARIES¶

In [111]:
from time import sleep
from typing import Union

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
    RegressionEvaluator,
)
from pyspark.ml import Model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, format_number, lit, udf, when
from pyspark.sql.types import FloatType, StringType, StructField, StructType
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, roc_curve, roc_auc_score
import seaborn as sns

FUNCTIONS¶

In [112]:
def create_dataframe_from_dataset(
    dataset,
    dataset_data_key="data",
    dataset_columns=["feature_1", "feature_2", "feature_3", "feature_4"],
    dataset_data_target="target",
):
    """
    Create a pandas DataFrame from a dataset dictionary.

    Args:
    - dataset (dict): The dataset dictionary containing data and target keys.
    - dataset_data_key (str): The key in the dataset dictionary corresponding to the data.
    - dataset_columns (list): List of column names for the DataFrame.
    - dataset_data_target (str): The key in the dataset dictionary corresponding to the target.

    Returns:
    - df (DataFrame): A pandas DataFrame containing the data and target from the dataset dictionary.
    """
    # Create DataFrame
    df = pd.DataFrame(dataset[dataset_data_key], columns=dataset_columns)
    df[dataset_data_target] = dataset[dataset_data_target]

    return df

def create_multiple_samples_datasets(
    original_dataset, n_datasets, n_records, delay_time=5
):
    """
    Create multiple datasets from an original dataset.

    Args:
    - original_dataset (Bunch): The original dataset from which to create new datasets.
    - n_datasets (int): Number of datasets to create.
    - n_records (int): Number of records in each new dataset.
    - delay_time (int): Time to wait (in seconds) between creating each dataset.

    Returns:
    - datasets (list): A list of pandas DataFrames, each representing a new dataset.
    """
    datasets = []

    for dataset_index in range(n_datasets):
        # Create a new dataset with n random records
        indices = np.random.choice(
            range(len(original_dataset.data)), size=n_records, replace=False
        )
        data = original_dataset.data[indices]
        target = original_dataset.target[indices]
        new_dataset = pd.DataFrame(data, columns=original_dataset.feature_names)
        new_dataset["target"] = target
        datasets.append(new_dataset)

        # Write new dataset to a CSV file
        new_dataset.to_csv(f"new_dataset_{dataset_index}.csv", index=False)

        # Wait for a specified delay time
        sleep(delay_time)

    return datasets


def load_dataset():
    """
    Load the Iris dataset and display it as a pandas DataFrame.

    Returns:
    - data_features (array): Array containing the features of the Iris dataset.
    - target_classes (array): Array containing the target classes of the Iris dataset.
    """
    # Load the Iris dataset
    dataset = load_iris()
    data_features, target_classes = dataset.data, dataset.target
    key_column = "data"
    features_columns = ["feature_1", "feature_2", "feature_3", "feature_4"]
    taget_column = "target"

    create_dataframe_from_dataset(
        dataset,
        dataset_data_key=key_column,
        dataset_columns=features_columns,
        dataset_data_target=taget_column,
    )

    return data_features, target_classes


def load_dataset_from_csv(csv_file):
    """
    Load the Iris dataset from a CSV file and display it as a pandas DataFrame.

    Args:
        csv_file (str): Path to the CSV file containing the Iris dataset.

    Returns:
        - data_features (array): Array containing the features of the Iris dataset.
        - target_classes (array): Array containing the target classes of the Iris dataset.
    """
    # Load the Iris dataset from the CSV file
    df = pd.read_csv(csv_file)

    # Separate the feature columns and the target column
    features_columns = [
        "sepal length (cm)",
        "sepal width (cm)",
        "petal length (cm)",
        "petal width (cm)",
    ]
    target_column = "class"

    data_features = df[features_columns].values
    target_classes = df[target_column].values

    return data_features, target_classes


def split_dataset_train_testing(data_features, target_classes):
    """
    Split the dataset into training and testing sets.

    Args:
    - data_features (array): Array containing the features of the dataset.
    - target_classes (array): Array containing the target classes of the dataset.

    Returns:
    - data_features_train_splitted (array): Features of the training set.
    - data_features_test_splitted (array): Features of the testing set.
    - target_classes_train_splitted (array): Target classes of the training set.
    - target_classes_test_splitted (array): Target classes of the testing set.
    """
    # Split the data into training and testing sets
    (
        data_features_train_splitted,
        data_features_test_splitted,
        target_classes_train_splitted,
        target_classes_test_splitted,
    ) = train_test_split(data_features, target_classes, test_size=0.2, random_state=42)

    return (
        data_features_train_splitted,
        data_features_test_splitted,
        target_classes_train_splitted,
        target_classes_test_splitted,
    )

def get_filtered_dataset(data_features_dataset, target_classes_dataset):
    """
    Convert dataset features and target classes to appropriate formats.

    Args:
    - data_features_dataset (array): Array containing the features of the dataset.
    - target_classes_dataset (array): Array containing the target classes of the dataset.

    Returns:
    - data_features_filtered (list): List of feature vectors converted to Vectors.dense format.
    - target_classes_filtered (list): List of target classes converted to floats.
    """
    # Convert numpy values to Python floats
    data_features_filtered = [
        Vectors.dense(features) for features in data_features_dataset
    ]
    target_classes_filtered = [
        float(label) for label in target_classes_dataset
    ]  # Convert to float

    return data_features_filtered, target_classes_filtered


def create_spark_dataframe(
    sparkml_session, data_features_dataset, target_classes_dataset
):
    """
    Create a Spark DataFrame from the dataset features and target classes.

    Args:
    - data_features_dataset (array): Array containing the features of the dataset.
    - target_classes_dataset (array): Array containing the target classes of the dataset.

    Returns:
    - final_data_set (DataFrame): A Spark DataFrame containing the features and target classes.
    """
    # Create Spark DataFrames from training data
    data_set = list(zip(data_features_dataset, target_classes_dataset))
    schema = StructType(
        [
            StructField("features", VectorUDT(), True),
            StructField("label", FloatType(), True),
        ]
    )
    final_data_set = sparkml_session.createDataFrame(data_set, schema=schema)

    return final_data_set

def create_ml_trained_model(model_class: type[Model], train_df: DataFrame, model_params: dict) -> Model:
    """
    Create and train an ML model using the specified model class and parameters.

    Args:
    - model_class (type[Model]): The ML model class to instantiate (e.g., LogisticRegression, DecisionTreeClassifier).
    - train_df (DataFrame): DataFrame containing the training data.
    - model_params (Dict): Dictionary of parameters to configure the model.

    Returns:
    - Model: The trained ML model (type will depend on the model class).
    """
    # Instantiate the model with the provided parameters
    model_instance = model_class(**model_params)
    
    # Train the model using the provided training DataFrame
    trained_model = model_instance.fit(train_df)

    return trained_model

def model_prediction(model, test_df, target_names):
    """
    Make predictions using a trained model and display the results.

    Args:
    - model: Trained machine learning model.
    - test_df (DataFrame): DataFrame containing the test data.

    Returns:
    - prediction_result (DataFrame): DataFrame containing the prediction results.
    """
    # Make predictions on the test data
    prediction_result = model.transform(test_df)
    # Format prediction column to display two decimal places
    prediction_result = prediction_result.withColumn(
        "formatted_prediction", format_number("prediction", 2)
    )

    # Map numeric labels back to target names
    label_to_name_udf = udf(lambda label: target_names[int(label)], StringType())

    # Create new columns with predicted and actual class names
    prediction_result = prediction_result.withColumn(
        "predicted_class_name", label_to_name_udf("prediction")
    )
    prediction_result = prediction_result.withColumn(
        "actual_class_name", label_to_name_udf("label")
    )

    # Add new column to indicate correct or incorrect classification
    prediction_result = prediction_result.withColumn(
        "correctly_classified",
        when(
            prediction_result["predicted_class_name"]
            == prediction_result["actual_class_name"],
            "✓",
        ).otherwise("❌"),
    )

    # Display the results
    prediction_result.show()

    return prediction_result

def mse_rmse_mae_r2_avaluator(prediction_result):
    """
    Evaluate regression metrics including MSE, RMSE, MAE, and R2 score, 
    and store the results in a pandas DataFrame.

    Args:
    - prediction_result (DataFrame): DataFrame containing the prediction results.

    Returns:
    - pandas.DataFrame: DataFrame with the evaluation metrics and their values.
    """
    # Initialize evaluators
    regression_evaluator_mse = RegressionEvaluator(
        predictionCol="prediction", labelCol="label", metricName="mse"
    )
    regression_evaluator_rmse = RegressionEvaluator(
        predictionCol="prediction", labelCol="label", metricName="rmse"
    )
    regression_evaluator_mae = RegressionEvaluator(
        predictionCol="prediction", labelCol="label", metricName="mae"
    )
    r2_evaluator = RegressionEvaluator(
        predictionCol="prediction", labelCol="label", metricName="r2"
    )
    
    # Calculate metrics and round to 4 decimal places
    mse = round(regression_evaluator_mse.evaluate(prediction_result), 4) * 100
    rmse = round(regression_evaluator_rmse.evaluate(prediction_result), 4) * 100
    mae = round(regression_evaluator_mae.evaluate(prediction_result), 4) * 100
    r2 = round(r2_evaluator.evaluate(prediction_result), 4) * 100

    # Print metrics with 4 decimals
    print(f"Mean Squared Error (Regression): {mse:.4f} %")
    print(f"Root Mean Squared Error (Regression): {rmse:.4f} %")
    print(f"Mean Absolute Error (Regression): {mae:.4f} %")
    print(f"R2 Score (Regression): {r2:.4f} %")
    
    # Create pandas DataFrame with 4 decimal places
    metrics_df = pd.DataFrame({
        "Metric": ["Mean Squared Error", "Root Mean Squared Error", "Mean Absolute Error", "R2 Score"],
        "Value (%)": [mse, rmse, mae, r2]
    })
    
    # Display DataFrame with 4 decimals
    display(metrics_df)
    return metrics_df

def accuracy_f1Score_precision_recall_evaluator(prediction_result):
    """
    Evaluate model performance using accuracy, F1 score, precision, and recall metrics,
    and store the results in a pandas DataFrame.

    Args:
    - prediction_result (DataFrame): DataFrame containing the prediction results.

    Returns:
    - pandas.DataFrame: DataFrame with the evaluation metrics and their values.
    """
    # Initialize evaluators
    evaluator_accuracy = MulticlassClassificationEvaluator(
        predictionCol="prediction", labelCol="label", metricName="accuracy"
    )
    evaluator_f1 = MulticlassClassificationEvaluator(
        predictionCol="prediction", labelCol="label", metricName="f1"
    )
    evaluator_precision = MulticlassClassificationEvaluator(
        predictionCol="prediction", labelCol="label", metricName="weightedPrecision"
    )
    evaluator_recall = MulticlassClassificationEvaluator(
        predictionCol="prediction", labelCol="label", metricName="weightedRecall"
    )
    
    # Calculate metrics and round to 4 decimal places
    accuracy = round(evaluator_accuracy.evaluate(prediction_result), 4) * 100
    f1_score = round(evaluator_f1.evaluate(prediction_result), 4) * 100
    precision = round(evaluator_precision.evaluate(prediction_result), 4) * 100
    recall = round(evaluator_recall.evaluate(prediction_result), 4) * 100

    # Print metrics with 4 decimals
    print(f"Accuracy: {accuracy:.4f} %")
    print(f"F1 Score: {f1_score:.4f} %")
    print(f"Precision: {precision:.4f} %")
    print(f"Recall: {recall:.4f} %")
    
    # Create pandas DataFrame with 4 decimal places
    metrics_df = pd.DataFrame({
        "Metric": ["Accuracy", "F1 Score", "Precision", "Recall"],
        "Value (%)": [accuracy, f1_score, precision, recall]
    })
    
    # Display DataFrame with 4 decimals
    display(metrics_df)
    return metrics_df

def calculate_and_plot_confusion_matrix(prediction_result, class_names):
    """
    Calculate the confusion matrix using Spark's MulticlassMetrics 
    and plot it using Seaborn with correct class names.

    Parameters:
        prediction_result (DataFrame): Spark DataFrame with "prediction" and "label" columns.
        class_names (dict): Dictionary mapping class indices to class names.
    """
    # Extract predictions and labels as RDD
    prediction_and_label = prediction_result.select("prediction", "label").rdd.map(
        lambda row: (float(row["prediction"]), float(row["label"]))
    )

    # Create a MulticlassMetrics object
    metrics = MulticlassMetrics(prediction_and_label)

    # Calculate the confusion matrix
    confusion_matrix = metrics.confusionMatrix().toArray()

    # Print confusion matrix values
    print("Confusion Matrix:")
    print(confusion_matrix)
    
    # Define class labels for the axes
    labels = [class_names[key] for key in sorted(class_names.keys())]

    # Plot confusion matrix using Seaborn
    plt.figure(figsize=(8, 6))
    sns.heatmap(confusion_matrix, annot=True, fmt="g", cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.show()

def calculate_and_plot_area_under_curve(prediction_result, class_names):
    # Create a BinaryClassificationEvaluator to calculate ROC AUC
    binary_evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol="label"
    )
    # Evaluate and calculate the Area Under ROC (AUC)
    roc_auc = binary_evaluator.evaluate(
        prediction_result, {binary_evaluator.metricName: "areaUnderROC"}
    )
    # Print the calculated ROC AUC value
    print(f"General Area Under ROC: {roc_auc:.4f} * 100 %")

    # Extract raw predictions and true labels from the prediction result
    raw_predictions = prediction_result.select("rawPrediction", "label").rdd.map(
        lambda row: (row["rawPrediction"], row["label"])
    ).collect()

    # Extract predicted probabilities and true labels into arrays
    predictions = np.array([prediction[0] for prediction in raw_predictions])
    labels = np.array([prediction[1] for prediction in raw_predictions])

    # Initialize a list to store AUC scores for each class
    auc_scores = []

    # Create a figure for plotting ROC curves
    plt.figure(figsize=(12, 6))

    # Iterate through each class and plot the corresponding ROC curve
    for i, class_label in enumerate(class_names.values()):
        # Compute the ROC curve for each class (one-vs-rest)
        fpr, tpr, thresholds = roc_curve(labels == i, predictions[:, i])
        auc_score = roc_auc_score(labels == i, predictions[:, i])
        
        # Add the AUC score to the list
        auc_scores.append(auc_score)

        # Print the AUC score for the current class
        print(f"AUC for class {class_label}: {auc_score:.2f}")

        # Plot the ROC curve for the current class
        plt.subplot(1, len(class_names), i + 1)
        plt.plot(fpr, tpr, label=f'{class_label} (AUC = {auc_score:.2f})')
        plt.plot([0, 1], [0, 1], 'k--')  # Diagonal line for random classifier
        plt.xlabel('False Positive Rate (FPR)')
        plt.ylabel('True Positive Rate (TPR)')
        plt.title(f'ROC Curve - {class_label}')
        plt.legend(loc="lower right")

    # Calculate and print the average AUC score across all classes
    avg_auc = np.mean(auc_scores)
    print(f"Average AUC (based on the average of all individual curves): {avg_auc:.2f}")

    # Adjust layout to make the plots fit
    plt.tight_layout()
    # Show the ROC curve plots
    plt.show()


def area_under_curve_confusion_matrix_evaluator(prediction_result, class_names):
    # Call the function to calculate and plot Area Under ROC and confusion matrix
    calculate_and_plot_area_under_curve(prediction_result, class_names)
    calculate_and_plot_confusion_matrix(prediction_result, class_names)

def create_pandas_dataframe(spark_dataframe):
    """
    Convert a Spark DataFrame to a Pandas DataFrame and display the results.

    Args:
    - spark_dataframe (DataFrame): Spark DataFrame to be converted.

    Returns:
    - df_pandas (DataFrame): Pandas DataFrame containing the data from the Spark DataFrame.
    """
    # Display the results like a pandas DataFrame
    df_pandas = spark_dataframe.toPandas()
    display(df_pandas)
    return df_pandas

def display_classification_results(df_results):
    """
    Display classification results based on the DataFrame provided.

    Args:
    - df_results (DataFrame): DataFrame containing classification results.

    Returns:
    - None
    """
    # Obtain unique classes from the 'correctly_classified' column
    unique_classes = df_results['correctly_classified'].unique()

    # Print total number of unique classes
    print("Total unique classes:", len(unique_classes))
    print("Unique classes:", unique_classes)
    print("Total Records on Dataset:", df_results.shape[0])

    # Iterate over each unique class and calculate the percentage of records classified as that class
    for class_value in unique_classes:
        # Filter DataFrame to get records classified as the current class
        class_records = df_results.query(f'correctly_classified == "{class_value}"')
        
        # Calculate percentage of records classified as the current class
        total_percentage = (class_records.shape[0] / df_results.shape[0]) * 100
        
        # Print the number of records classified as the current class and the percentage
        print(f'Records classified "{class_value}": {class_records.shape[0]}/{df_results.shape[0]} = {round(total_percentage, 2)}%')

def save_spark_model_and_metadata_to_separate_delta_dirs(spark_session, model, model_path, model_metadata_delta_path, overwrite=False):
    """
    Save a trained Spark model to the specified path and store its metadata in a separate Delta path.

    Args:
    - spark_session (SparkSession): The existing Spark session to use.
    - model (pyspark.ml.Model): The trained Spark model to save.
    - model_path (str): The path where the model will be saved.
    - model_metadata_delta_path (str): The path where the model metadata will be saved in Delta format.
    - overwrite (bool, optional): Whether to overwrite the model if it already exists. Default is False.

    Returns:
    - None
    """
    # Save the model to the specified path
    if overwrite:
        model.write().overwrite().save(model_path)
        print(f"ML Model successfully overwritten at: {model_path}")

    else:
        model.write().save(model_path)
        print(f"ML Model saved successfully at: {model_path}")

    # Save model metadata in Delta format (optional)
    model_metadata = spark_session.createDataFrame([(model_path,)], ["model_path"])

    if overwrite:
        model_metadata.write.format("delta").mode("overwrite").save(model_metadata_delta_path)
    else:
        model_metadata.write.format("delta").mode("append").save(model_metadata_delta_path)

    print(f"Model Delta metadata saved at: {model_metadata_delta_path}")
    print()


def load_spark_model(model_path: str, model_class: LogisticRegressionModel) -> Union[Model, None]:
    """
    Load a trained Spark model from the specified path.

    Args:
    - model_path (str): The path from where the model will be loaded.
    - model_class (LogisticRegressionModel): The model class (LogisticRegressionModel in this case).

    Returns:
    - Union[LogisticRegressionModel, None]: The loaded Spark model or None if loading fails.
    """
    try:
        # Load the model from the specified path using the provided model class
        model = model_class.load(model_path)
        print(f"Model loaded with {model_class}, successfully from: {model_path}")
        return model
    except Exception as e:
        print(f"Error loading model: {e}")
        return None

RUNNING ML PROCCESS¶

In [113]:
# Step 1: Load the dataset
data_features, target_classes = load_dataset()

# Step 2: Load the sample dataset for external source
dataset_file = "https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/iris_dataset.csv"
data_features, target_classes = load_dataset_from_csv(dataset_file)

# Step 3: Split the dataset into training and testing sets
(
    data_features_train_splitted,
    data_features_test_splitted,
    target_classes_train_splitted,
    target_classes_test_splitted,
) = split_dataset_train_testing(data_features, target_classes)

# Step 4: Filter the split datasets
data_features_train, target_classes_train = get_filtered_dataset(
    data_features_train_splitted, target_classes_train_splitted
)
data_features_test, target_classes_test = get_filtered_dataset(
    data_features_test_splitted, target_classes_test_splitted
)

# Step 5: Create Spark train and avlidation dataframes from the filtered datasets
# 5.1 Dataset for Training the Model
train_df = create_spark_dataframe(
    spark_session, data_features_train, target_classes_train
)
# 5.2 Dataset for Validating the Model
test_df = create_spark_dataframe(
    spark_session, data_features_test, target_classes_test
)

# Step 6: Train a machine learning model
model_params = {
    "featuresCol": "features",
    "labelCol": "label",
    "maxIter": 10,
    "regParam": 0.3,
    "elasticNetParam": 0.8,
}

model_trained = create_ml_trained_model(
    model_class=LogisticRegression,
    train_df=train_df,
    model_params=model_params
)

# Step 7: Export the trained model to a specified location
model_path = "warehouse-spark/spark_ml_models/models/dataset_iris"
model_metadata_delta_path ="warehouse-spark/spark_ml_models/delta/dataset_iris"

# to update model multiple times
for iteration in range(3):
    save_spark_model_and_metadata_to_separate_delta_dirs(spark_session, model_trained, model_path, model_metadata_delta_path, overwrite=True)

# Step 8: Load the trained model from the specified location
model_class = LogisticRegressionModel
model = load_spark_model(model_path, model_class)

# Step 9: Make predictions using the loaded model
target_names = ["setosa", "versicolor", "virginica"]
prediction_result = model_prediction(model, test_df, target_names)

# Step 10: Evaluate the model's performance on specified evaluation metrics
mse_rmse_mae_r2_avaluator(prediction_result)
accuracy_f1Score_precision_recall_evaluator(prediction_result)

# Step 11: Define class names and Evaluate the model's performance on specified evaluation metrics
# Define the class names based on the specific class labels you mentioned
class_names = {0.0: 'setosa', 1.0: 'versicolor', 2.0: 'virginica'}
area_under_curve_confusion_matrix_evaluator(prediction_result, class_names)

# Step 12: Create a Pandas dataframe from the resulting Spark dataframe of predictions
df_results = create_pandas_dataframe(spark_dataframe=prediction_result)

# Step 13: Check the results and Interpret the model's results and analyze the findings using evaluation metrics
display_classification_results(df_results)
24/12/27 20:19:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
ML Model successfully overwritten at: warehouse-spark/spark_ml_models/models/dataset_iris
Model Delta metadata saved at: warehouse-spark/spark_ml_models/delta/dataset_iris

ML Model successfully overwritten at: warehouse-spark/spark_ml_models/models/dataset_iris
Model Delta metadata saved at: warehouse-spark/spark_ml_models/delta/dataset_iris

ML Model successfully overwritten at: warehouse-spark/spark_ml_models/models/dataset_iris
Model Delta metadata saved at: warehouse-spark/spark_ml_models/delta/dataset_iris

Model loaded with <class 'pyspark.ml.classification.LogisticRegressionModel'>, successfully from: warehouse-spark/spark_ml_models/models/dataset_iris
+--------------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+--------------------+
|            features|label|       rawPrediction|         probability|prediction|formatted_prediction|predicted_class_name|actual_class_name|correctly_classified|
+--------------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+--------------------+
|[6.34,3.03,4.29,1.1]|  1.0|[-0.5405590083053...|[0.28644828464755...|       1.0|                1.00|          versicolor|       versicolor|                   ✓|
|[6.08,2.62,3.93,1...|  1.0|[-0.5466408257513...|[0.27689074407750...|       2.0|                2.00|           virginica|       versicolor|                   ❌|
|[5.24,2.6,4.74,1.11]|  1.0|[-0.6615694617345...|[0.26211433883602...|       1.0|                1.00|          versicolor|       versicolor|                   ✓|
|[4.75,3.12,1.32,0...|  0.0|[0.47294985742743...|[0.54758662353233...|       0.0|                0.00|              setosa|           setosa|                   ✓|
|[6.76,3.21,5.29,1...|  2.0|[-1.0773019594656...|[0.17272724854531...|       2.0|                2.00|           virginica|        virginica|                   ✓|
|[5.66,2.61,3.88,1.3]|  1.0|[-0.4979637814550...|[0.28977899348067...|       1.0|                1.00|          versicolor|       versicolor|                   ✓|
|[7.34,2.9,4.08,0.97]|  1.0|[-0.4435353588106...|[0.31025496235700...|       1.0|                1.00|          versicolor|       versicolor|                   ✓|
|[7.72,3.23,6.03,2...|  2.0|[-1.3583358480790...|[0.13154999753581...|       2.0|                2.00|           virginica|        virginica|                   ✓|
| [5.0,2.92,1.42,0.2]|  0.0|[0.50178183887836...|[0.55956091035566...|       0.0|                0.00|              setosa|           setosa|                   ✓|
|[7.35,3.16,5.4,1.99]|  2.0|[-1.1190335437155...|[0.16603723307095...|       2.0|                2.00|           virginica|        virginica|                   ✓|
|[5.42,3.47,5.41,2...|  2.0|[-1.1701838678711...|[0.15614141190010...|       2.0|                2.00|           virginica|        virginica|                   ✓|
|[5.15,3.75,1.46,0...|  0.0|[0.47189971616237...|[0.55048494054842...|       0.0|                0.00|              setosa|           setosa|                   ✓|
|[4.48,3.45,1.65,0...|  0.0|[0.39305266828324...|[0.52831027614363...|       0.0|                0.00|              setosa|           setosa|                   ✓|
|[6.59,3.31,4.13,1...|  1.0|[-0.5407455069699...|[0.28293832367297...|       1.0|                1.00|          versicolor|       versicolor|                   ✓|
|[5.22,3.27,6.14,2...|  2.0|[-1.4097740531015...|[0.12462871550908...|       2.0|                2.00|           virginica|        virginica|                   ✓|
|[5.28,3.43,1.26,0...|  0.0|[0.55983506484927...|[0.57520218844506...|       0.0|                0.00|              setosa|           setosa|                   ✓|
|[5.77,2.47,5.02,1.6]|  1.0|[-0.8933931025491...|[0.20870144707264...|       2.0|                2.00|           virginica|       versicolor|                   ❌|
|[4.92,3.38,1.19,0.2]|  0.0|[0.56197790561056...|[0.57433893507614...|       0.0|                0.00|              setosa|           setosa|                   ✓|
|[6.31,3.28,6.03,2...|  2.0|[-1.3648069285941...|[0.13047120321374...|       2.0|                2.00|           virginica|        virginica|                   ✓|
|[6.79,3.15,6.45,1...|  2.0|[-1.3711928926468...|[0.13517439851462...|       2.0|                2.00|           virginica|        virginica|                   ✓|
+--------------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+--------------------+
only showing top 20 rows

Mean Squared Error (Regression): 15.0000 %
Root Mean Squared Error (Regression): 38.7300 %
Mean Absolute Error (Regression): 15.0000 %
R2 Score (Regression): 77.6500 %
Metric Value (%)
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Accuracy: 85.0000 %
F1 Score: 84.1700 %
Precision: 89.8000 %
Recall: 85.0000 %
Metric Value (%)
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
General Area Under ROC: 0.9973 * 100 %
                                                                                
AUC for class setosa: 1.00
AUC for class versicolor: 0.50
AUC for class virginica: 1.00
Average AUC (based on the average of all individual curves): 0.83
No description has been provided for this image
/usr/local/lib/python3.12/site-packages/pyspark/sql/context.py:158: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
  warnings.warn(
Confusion Matrix:
[[225.   0.   0.]
 [  2. 104.  88.]
 [  0.   0. 181.]]
No description has been provided for this image
                                                                                
features label rawPrediction probability prediction formatted_prediction predicted_class_name actual_class_name correctly_classified
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Total unique classes: 2
Unique classes: ['✓' '❌']
Total Records on Dataset: 600
Records classified "✓": 510/600 = 85.0%
Records classified "❌": 90/600 = 15.0%

METRICS ANALYSIS¶

Metric Description When to Use Value Value Explanation Formula
Mean Squared Error (MSE) The mean of the squared error between predictions and actual values. A lower value indicates better predictions. In regression problems where large errors need to be penalized more, e.g., in housing price prediction. 15.0% MSE calculates the average of squared errors between the model predictions and actual values. A lower value means the model predictions are closer to the actual values. ( MSE = \frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2 )
Mean Absolute Error (MAE) The mean of absolute errors between predictions and actual values. A lower value indicates better predictions. When avoiding outliers that may influence MSE, e.g., in product demand prediction. 15.0% MAE calculates the average of absolute differences between the model's predictions and actual values. An MAE of 15.0% indicates that, on average, the model's predictions deviate by 15%. ( MAE = \frac{1}{n} \sum_{i=1}^{n} [y_i - \hat{y}_i] )
Root Mean Squared Error (RMSE) The square root of the mean squared error. A lower value indicates better predictions. Similar to MSE but provides a more intuitive interpretation since it's on the same scale as the target variable. 38.73% RMSE is the square root of MSE, giving a measure of the average error in the same unit as the target variable. An RMSE of 38.73% means the model's predictions tend to deviate by 38.73%. ( RMSE = \sqrt{MSE} )
R² Score The percentage of variance in the target variable explained by the regression. A higher value indicates better explanation. Useful to understand how much variability in the target is explained by the model, e.g., in engine performance prediction. 77.65% The R² Score represents the proportion of variability in the target variable explained by the model. A value of 77.65% means the model explains 77.65% of the variability in the data. ( R^2 = 1 - \frac{SSE}{SST} )
Accuracy The proportion of correctly classified instances. A higher value indicates better accuracy. In binary or multiclass classification problems where all classes are equally important, e.g., in email spam detection. 85.0% Accuracy is the proportion of correctly classified instances. A value of 85.0% means the model correctly classifies 85.0% of instances. ( \text{Accuracy} = \frac{\text{Number of Correct Predictions}}{\text{Total Number of Predictions}} )
F1 Score The harmonic mean of recall and precision. A higher value indicates good precision and recall. When balancing precision and recall is needed in imbalanced classification problems, e.g., in disease detection in medical tests. 84.17% The F1 Score measures the balance between precision and recall. A value of 84.17% indicates a good balance between precision and recall in the model. ( F1 = 2 \times \frac{\text{Precision} \times \text{Recall}}{\text{Precision} + \text{Recall}} )
Precision The proportion of true positive instances among all instances classified as positive. A higher value indicates better precision. Important when the cost of false positives is high, e.g., in fraud detection in financial transactions. 89.8% Precision is the proportion of positive instances correctly identified by the model among all positive instances identified. A value of 89.8% indicates that 89.8% of instances identified as positive are truly positive. ( \text{Precision} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Positives}} )
Recall The proportion of true positive instances among all actual positive instances. A higher value indicates better recall. Crucial when the cost of false negatives is high, e.g., in detecting severe diseases. 85.0% Recall is the proportion of positive instances correctly identified by the model among all actual positive instances. A value of 85.0% means the model correctly identifies 85.0% of all positive instances. ( \text{Recall} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Negatives}} )
Area Under ROC (AUC-ROC) The area under the receiver operating characteristic curve, measuring the model's ability to discriminate between classes. A higher value indicates better discrimination. Useful for evaluating model performance across all classification threshold settings, e.g., in disease detection in laboratory tests. 99.73% AUC-ROC measures the model's ability to discriminate between classes. A value of 99.73% indicates the model has a very high capacity to distinguish between classes. Calculated by integrating the ROC curve, which plots the true positive rate against the false positive rate at different classification thresholds.
Confusion Matrix A matrix showing classification results. To evaluate the performance of a classification model. [[225. 0. 0.] [ 2. 104. 88.] [ 0. 0. 181.]] The confusion matrix shows the count of correctly and incorrectly classified instances for each class. In this case, there are 225 correctly classified instances of class 1, 104 of class 2, and 181 of class 3. Additionally, there are 2 instances of class 1 misclassified as class 2, 88 instances of class 2 misclassified as class 3, and no errors in classifying class 3.

RESTORE ML MODEL VERSION¶

In [114]:
restore_delta_lake_to_version(spark_session, model_metadata_delta_path, 0)
24/12/27 20:19:54 WARN DAGScheduler: Broadcasting large task binary with size 1079.1 KiB
Restored to version 0.
Out[114]:
format id name description location createdAt lastModified partitionColumns clusteringColumns numFiles sizeInBytes properties minReaderVersion minWriterVersion tableFeatures
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Method 1 - History Version¶

In [115]:
historic_version_file = show_historic_version_from_delta_file(spark_session, model_metadata_delta_path)
historic_version_file
Out[115]:
version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
In [116]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(show_historic_version_from_delta_file(spark_session, model_metadata_delta_path, version))
    print()
Version : 0, Operation : WRITE, Content
Warning: Column 'None' not found in the DataFrame.
model_path
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : WRITE, Content
Warning: Column 'None' not found in the DataFrame.
model_path
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : WRITE, Content
Warning: Column 'None' not found in the DataFrame.
model_path
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : RESTORE, Content
Warning: Column 'None' not found in the DataFrame.
model_path
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

Method 2 - Change Data Feed Version Validation and Control¶

In [117]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(read_delta_table_with_change_data_control(spark_session, model_metadata_delta_path, version, version))
    print()
Version : 0, Operation : WRITE, Content
model_path _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 1, Operation : WRITE, Content
model_path _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 2, Operation : WRITE, Content
model_path _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
Version : 3, Operation : RESTORE, Content
model_path _change_type _commit_version _commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)

RELEASE ALL USED RESOURCES¶

In [118]:
# RELEASE MEMORY BY REMOVING SPARK DATAFRAMES FROM CACHE
# Release memory by unpersisting Spark DataFrames from the cache
spark_dataframe_sample.unpersist()  # Removes the cache of the 'spark_dataframe_sample'
spark_dataframe_sql.unpersist()  # Removes the cache of the 'spark_dataframe_sql'
spark_dataframe_delta.unpersist()  # Removes the cache of the 'spark_dataframe_delta'
spark_dataframe_joins.unpersist()  # Removes the cache of the 'spark_dataframe_joins'
spark_dataframe_uno.unpersist()  # Removes the cache of the 'spark_dataframe_uno'
spark_dataframe_dos.unpersist()  # Removes the cache of the 'spark_dataframe_dos'
spark_dataframe_uno_renamed.unpersist()  # Removes the cache of the 'spark_dataframe_uno_renamed'
spark_dataframe_dos_renamed.unpersist()  # Removes the cache of the 'spark_dataframe_dos_renamed'

# Stop the Spark session to release all associated resources
spark_session.stop()

Displays the total time taken for the entire process¶

In [119]:
end_time = time.time()    # Record the end time

execution_time = end_time - start_time  # Calculate the execution time
print(f"Total Execution time: {execution_time:.6f} seconds")
Total Execution time: 310.064593 seconds